Contain vectorisation changes to the TUN package

This commit is contained in:
Neil Alexander 2024-07-18 22:57:28 +01:00
parent c38544014c
commit 115d8a3b9d
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 38 additions and 67 deletions

View file

@ -4,7 +4,6 @@ import (
"crypto/ed25519" "crypto/ed25519"
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"sync" "sync"
"time" "time"
@ -16,7 +15,6 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/core" "github.com/yggdrasil-network/yggdrasil-go/src/core"
"github.com/yggdrasil-network/yggdrasil-go/src/tun"
) )
const keyStoreTimeout = 2 * time.Minute const keyStoreTimeout = 2 * time.Minute
@ -339,40 +337,11 @@ func (k *keyStore) MTU() uint64 {
type ReadWriteCloser struct { type ReadWriteCloser struct {
keyStore keyStore
ch chan []byte
errch chan error
}
const bufPoolSize = tun.TUN_OFFSET_BYTES + 65535
var bufPool = sync.Pool{
New: func() any {
b := [bufPoolSize]byte{}
return b[:]
},
} }
func NewReadWriteCloser(c *core.Core) *ReadWriteCloser { func NewReadWriteCloser(c *core.Core) *ReadWriteCloser {
rwc := new(ReadWriteCloser) rwc := new(ReadWriteCloser)
rwc.init(c) rwc.init(c)
rwc.ch = make(chan []byte, tun.TUN_MAX_VECTOR)
rwc.errch = make(chan error, 1)
go func() {
for {
p := bufPool.Get().([]byte)[:bufPoolSize]
n, err := rwc.readPC(p[:])
if err != nil || n == 0 {
if err == nil {
err = io.EOF
}
rwc.errch <- err
close(rwc.errch)
close(rwc.ch)
return
}
rwc.ch <- p[:n]
}
}()
return rwc return rwc
} }
@ -385,32 +354,7 @@ func (rwc *ReadWriteCloser) Subnet() address.Subnet {
} }
func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) { func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) {
msg := <-rwc.ch return rwc.readPC(p)
if msg == nil {
return 0, <-rwc.errch
}
return copy(p, msg), nil
}
func (rwc *ReadWriteCloser) ReadMany(b [][]byte, sizes []int, offset int) (c int, err error) {
var lb int
if c, lb = len(rwc.ch), len(b); c > lb {
c = lb
}
if c == 0 {
// If nothing is waiting yet then we should block
// for the next packet only.
c = 1
}
for i := 0; i < c; i++ {
msg := <-rwc.ch
if msg == nil {
return i, <-rwc.errch
}
sizes[i] = offset + copy(b[i][offset:], msg)
bufPool.Put(msg) // nolint:staticcheck
}
return
} }
func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) { func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) {

View file

@ -31,23 +31,38 @@ func (tun *TunAdapter) read() {
} }
} }
func (tun *TunAdapter) write() { func (tun *TunAdapter) queue() {
vs := tun.idealBatchSize()
bufs := make([][]byte, vs)
sizes := make([]int, vs)
for i := range bufs {
bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535)
}
for { for {
n, err := tun.rwc.ReadMany(bufs, sizes, TUN_OFFSET_BYTES) p := bufPool.Get().([]byte)[:bufPoolSize]
n, err := tun.rwc.Read(p)
if err != nil { if err != nil {
tun.log.Errorln("Exiting TUN writer due to core read error:", err) tun.log.Errorln("Exiting TUN writer due to core read error:", err)
return return
} }
tun.ch <- p[:n]
}
}
func (tun *TunAdapter) write() {
vs := cap(tun.ch)
bufs := make([][]byte, vs)
for i := range bufs {
bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535)
}
for {
n := len(tun.ch)
if n == 0 {
n = 1 // Nothing queued up yet, wait for it instead
}
for i := 0; i < n; i++ {
msg := <-tun.ch
bufs[i] = append(bufs[i][:TUN_OFFSET_BYTES], msg...)
bufPool.Put(msg) // nolint:staticcheck
}
if !tun.isEnabled { if !tun.isEnabled {
continue // Nothing to do, the tun isn't enabled continue // Nothing to do, the tun isn't enabled
} }
if _, err = tun.iface.Write(bufs[:n], TUN_OFFSET_BYTES); err != nil { if _, err := tun.iface.Write(bufs[:n], TUN_OFFSET_BYTES); err != nil {
tun.Act(nil, func() { tun.Act(nil, func() {
if !tun.isOpen { if !tun.isOpen {
tun.log.Errorln("TUN iface write error:", err) tun.log.Errorln("TUN iface write error:", err)

View file

@ -10,6 +10,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
"time" "time"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
@ -24,7 +25,6 @@ type MTU uint16
type ReadWriteCloser interface { type ReadWriteCloser interface {
io.ReadWriteCloser io.ReadWriteCloser
ReadMany([][]byte, []int, int) (int, error) // Vectorised reads
Address() address.Address Address() address.Address
Subnet() address.Subnet Subnet() address.Subnet
MaxMTU() uint64 MaxMTU() uint64
@ -50,6 +50,7 @@ type TunAdapter struct {
name InterfaceName name InterfaceName
mtu InterfaceMTU mtu InterfaceMTU
} }
ch chan []byte
} }
// Gets the maximum supported MTU for the platform based on the defaults in // Gets the maximum supported MTU for the platform based on the defaults in
@ -161,6 +162,8 @@ func (tun *TunAdapter) _start() error {
tun.rwc.SetMTU(tun.MTU()) tun.rwc.SetMTU(tun.MTU())
tun.isOpen = true tun.isOpen = true
tun.isEnabled = true tun.isEnabled = true
tun.ch = make(chan []byte, tun.idealBatchSize())
go tun.queue()
go tun.read() go tun.read()
go tun.write() go tun.write()
return nil return nil
@ -194,3 +197,12 @@ func (tun *TunAdapter) _stop() error {
} }
return nil return nil
} }
const bufPoolSize = TUN_OFFSET_BYTES + 65535
var bufPool = sync.Pool{
New: func() any {
b := [bufPoolSize]byte{}
return b[:]
},
}