diff --git a/src/ipv6rwc/ipv6rwc.go b/src/ipv6rwc/ipv6rwc.go index ef2e49f2..59f4f022 100644 --- a/src/ipv6rwc/ipv6rwc.go +++ b/src/ipv6rwc/ipv6rwc.go @@ -4,7 +4,6 @@ import ( "crypto/ed25519" "errors" "fmt" - "io" "net" "sync" "time" @@ -16,7 +15,6 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/core" - "github.com/yggdrasil-network/yggdrasil-go/src/tun" ) const keyStoreTimeout = 2 * time.Minute @@ -339,40 +337,11 @@ func (k *keyStore) MTU() uint64 { type ReadWriteCloser struct { keyStore - ch chan []byte - errch chan error -} - -const bufPoolSize = tun.TUN_OFFSET_BYTES + 65535 - -var bufPool = sync.Pool{ - New: func() any { - b := [bufPoolSize]byte{} - return b[:] - }, } func NewReadWriteCloser(c *core.Core) *ReadWriteCloser { rwc := new(ReadWriteCloser) rwc.init(c) - rwc.ch = make(chan []byte, tun.TUN_MAX_VECTOR) - rwc.errch = make(chan error, 1) - go func() { - for { - p := bufPool.Get().([]byte)[:bufPoolSize] - n, err := rwc.readPC(p[:]) - if err != nil || n == 0 { - if err == nil { - err = io.EOF - } - rwc.errch <- err - close(rwc.errch) - close(rwc.ch) - return - } - rwc.ch <- p[:n] - } - }() return rwc } @@ -385,32 +354,7 @@ func (rwc *ReadWriteCloser) Subnet() address.Subnet { } func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) { - msg := <-rwc.ch - if msg == nil { - return 0, <-rwc.errch - } - return copy(p, msg), nil -} - -func (rwc *ReadWriteCloser) ReadMany(b [][]byte, sizes []int, offset int) (c int, err error) { - var lb int - if c, lb = len(rwc.ch), len(b); c > lb { - c = lb - } - if c == 0 { - // If nothing is waiting yet then we should block - // for the next packet only. - c = 1 - } - for i := 0; i < c; i++ { - msg := <-rwc.ch - if msg == nil { - return i, <-rwc.errch - } - sizes[i] = offset + copy(b[i][offset:], msg) - bufPool.Put(msg) // nolint:staticcheck - } - return + return rwc.readPC(p) } func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) { diff --git a/src/tun/iface.go b/src/tun/iface.go index 4dee461c..2028b130 100644 --- a/src/tun/iface.go +++ b/src/tun/iface.go @@ -31,23 +31,38 @@ func (tun *TunAdapter) read() { } } -func (tun *TunAdapter) write() { - vs := tun.idealBatchSize() - bufs := make([][]byte, vs) - sizes := make([]int, vs) - for i := range bufs { - bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535) - } +func (tun *TunAdapter) queue() { for { - n, err := tun.rwc.ReadMany(bufs, sizes, TUN_OFFSET_BYTES) + p := bufPool.Get().([]byte)[:bufPoolSize] + n, err := tun.rwc.Read(p) if err != nil { tun.log.Errorln("Exiting TUN writer due to core read error:", err) return } + tun.ch <- p[:n] + } +} + +func (tun *TunAdapter) write() { + vs := cap(tun.ch) + bufs := make([][]byte, vs) + for i := range bufs { + bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535) + } + for { + n := len(tun.ch) + if n == 0 { + n = 1 // Nothing queued up yet, wait for it instead + } + for i := 0; i < n; i++ { + msg := <-tun.ch + bufs[i] = append(bufs[i][:TUN_OFFSET_BYTES], msg...) + bufPool.Put(msg) // nolint:staticcheck + } if !tun.isEnabled { continue // Nothing to do, the tun isn't enabled } - if _, err = tun.iface.Write(bufs[:n], TUN_OFFSET_BYTES); err != nil { + if _, err := tun.iface.Write(bufs[:n], TUN_OFFSET_BYTES); err != nil { tun.Act(nil, func() { if !tun.isOpen { tun.log.Errorln("TUN iface write error:", err) diff --git a/src/tun/tun.go b/src/tun/tun.go index d218698c..027dfd00 100644 --- a/src/tun/tun.go +++ b/src/tun/tun.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "net" + "sync" "time" "github.com/Arceliar/phony" @@ -24,7 +25,6 @@ type MTU uint16 type ReadWriteCloser interface { io.ReadWriteCloser - ReadMany([][]byte, []int, int) (int, error) // Vectorised reads Address() address.Address Subnet() address.Subnet MaxMTU() uint64 @@ -50,6 +50,7 @@ type TunAdapter struct { name InterfaceName mtu InterfaceMTU } + ch chan []byte } // Gets the maximum supported MTU for the platform based on the defaults in @@ -161,6 +162,8 @@ func (tun *TunAdapter) _start() error { tun.rwc.SetMTU(tun.MTU()) tun.isOpen = true tun.isEnabled = true + tun.ch = make(chan []byte, tun.idealBatchSize()) + go tun.queue() go tun.read() go tun.write() return nil @@ -194,3 +197,12 @@ func (tun *TunAdapter) _stop() error { } return nil } + +const bufPoolSize = TUN_OFFSET_BYTES + 65535 + +var bufPool = sync.Pool{ + New: func() any { + b := [bufPoolSize]byte{} + return b[:] + }, +}