Vectorise reads from IPv6 RWC

This commit is contained in:
Neil Alexander 2024-07-17 23:30:38 +01:00
parent 0fb5d76e30
commit c38544014c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 64 additions and 11 deletions

View file

@ -4,6 +4,7 @@ import (
"crypto/ed25519" "crypto/ed25519"
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"sync" "sync"
"time" "time"
@ -15,6 +16,7 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/core" "github.com/yggdrasil-network/yggdrasil-go/src/core"
"github.com/yggdrasil-network/yggdrasil-go/src/tun"
) )
const keyStoreTimeout = 2 * time.Minute const keyStoreTimeout = 2 * time.Minute
@ -337,11 +339,40 @@ func (k *keyStore) MTU() uint64 {
type ReadWriteCloser struct { type ReadWriteCloser struct {
keyStore keyStore
ch chan []byte
errch chan error
}
const bufPoolSize = tun.TUN_OFFSET_BYTES + 65535
var bufPool = sync.Pool{
New: func() any {
b := [bufPoolSize]byte{}
return b[:]
},
} }
func NewReadWriteCloser(c *core.Core) *ReadWriteCloser { func NewReadWriteCloser(c *core.Core) *ReadWriteCloser {
rwc := new(ReadWriteCloser) rwc := new(ReadWriteCloser)
rwc.init(c) rwc.init(c)
rwc.ch = make(chan []byte, tun.TUN_MAX_VECTOR)
rwc.errch = make(chan error, 1)
go func() {
for {
p := bufPool.Get().([]byte)[:bufPoolSize]
n, err := rwc.readPC(p[:])
if err != nil || n == 0 {
if err == nil {
err = io.EOF
}
rwc.errch <- err
close(rwc.errch)
close(rwc.ch)
return
}
rwc.ch <- p[:n]
}
}()
return rwc return rwc
} }
@ -354,7 +385,32 @@ func (rwc *ReadWriteCloser) Subnet() address.Subnet {
} }
func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) { func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) {
return rwc.readPC(p) msg := <-rwc.ch
if msg == nil {
return 0, <-rwc.errch
}
return copy(p, msg), nil
}
func (rwc *ReadWriteCloser) ReadMany(b [][]byte, sizes []int, offset int) (c int, err error) {
var lb int
if c, lb = len(rwc.ch), len(b); c > lb {
c = lb
}
if c == 0 {
// If nothing is waiting yet then we should block
// for the next packet only.
c = 1
}
for i := 0; i < c; i++ {
msg := <-rwc.ch
if msg == nil {
return i, <-rwc.errch
}
sizes[i] = offset + copy(b[i][offset:], msg)
bufPool.Put(msg) // nolint:staticcheck
}
return
} }
func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) { func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) {

View file

@ -1,9 +1,5 @@
package tun package tun
import (
"net"
)
const TUN_OFFSET_BYTES = 80 // sizeof(virtio_net_hdr) const TUN_OFFSET_BYTES = 80 // sizeof(virtio_net_hdr)
const TUN_MAX_VECTOR = 16 const TUN_MAX_VECTOR = 16
@ -16,7 +12,7 @@ func (tun *TunAdapter) idealBatchSize() int {
func (tun *TunAdapter) read() { func (tun *TunAdapter) read() {
vs := tun.idealBatchSize() vs := tun.idealBatchSize()
bufs := make(net.Buffers, vs) bufs := make([][]byte, vs)
sizes := make([]int, vs) sizes := make([]int, vs)
for i := range bufs { for i := range bufs {
bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535) bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535)
@ -36,13 +32,14 @@ func (tun *TunAdapter) read() {
} }
func (tun *TunAdapter) write() { func (tun *TunAdapter) write() {
vs := 1 // One at a time for now... eventually use tun.idealBatchSize() vs := tun.idealBatchSize()
bufs := make(net.Buffers, vs) bufs := make([][]byte, vs)
sizes := make([]int, vs)
for i := range bufs { for i := range bufs {
bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535) bufs[i] = make([]byte, TUN_OFFSET_BYTES+65535)
} }
for { for {
n, err := tun.rwc.Read(bufs[0][TUN_OFFSET_BYTES : TUN_OFFSET_BYTES+65535]) n, err := tun.rwc.ReadMany(bufs, sizes, TUN_OFFSET_BYTES)
if err != nil { if err != nil {
tun.log.Errorln("Exiting TUN writer due to core read error:", err) tun.log.Errorln("Exiting TUN writer due to core read error:", err)
return return
@ -50,8 +47,7 @@ func (tun *TunAdapter) write() {
if !tun.isEnabled { if !tun.isEnabled {
continue // Nothing to do, the tun isn't enabled continue // Nothing to do, the tun isn't enabled
} }
bufs[0] = bufs[0][:TUN_OFFSET_BYTES+n] if _, err = tun.iface.Write(bufs[:n], TUN_OFFSET_BYTES); err != nil {
if _, err = tun.iface.Write(bufs, TUN_OFFSET_BYTES); err != nil {
tun.Act(nil, func() { tun.Act(nil, func() {
if !tun.isOpen { if !tun.isOpen {
tun.log.Errorln("TUN iface write error:", err) tun.log.Errorln("TUN iface write error:", err)

View file

@ -24,6 +24,7 @@ type MTU uint16
type ReadWriteCloser interface { type ReadWriteCloser interface {
io.ReadWriteCloser io.ReadWriteCloser
ReadMany([][]byte, []int, int) (int, error) // Vectorised reads
Address() address.Address Address() address.Address
Subnet() address.Subnet Subnet() address.Subnet
MaxMTU() uint64 MaxMTU() uint64