mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2025-04-29 14:45:07 +03:00
eliminate most sync.Pool use, gives a safer but slightly slower interface
This commit is contained in:
parent
9d0969db2b
commit
6d89570860
14 changed files with 53 additions and 96 deletions
|
@ -252,7 +252,6 @@ func (c *Conn) Read(b []byte) (int, error) {
|
|||
}
|
||||
// Copy results to the output slice and clean up
|
||||
copy(b, bs)
|
||||
util.PutBytes(bs)
|
||||
// Return the number of bytes copied to the slice, along with any error
|
||||
return n, err
|
||||
}
|
||||
|
@ -323,10 +322,11 @@ func (c *Conn) writeNoCopy(msg FlowKeyMessage) error {
|
|||
// returned.
|
||||
func (c *Conn) Write(b []byte) (int, error) {
|
||||
written := len(b)
|
||||
msg := FlowKeyMessage{Message: append(util.GetBytes(), b...)}
|
||||
bs := make([]byte, 0, len(b)+crypto.BoxOverhead)
|
||||
bs = append(bs, b...)
|
||||
msg := FlowKeyMessage{Message: bs}
|
||||
err := c.writeNoCopy(msg)
|
||||
if err != nil {
|
||||
util.PutBytes(msg.Message)
|
||||
written = 0
|
||||
}
|
||||
return written, err
|
||||
|
|
|
@ -406,10 +406,6 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
|
|||
w.intf.notifySending(size, isLinkTraffic)
|
||||
w.intf.msgIO.writeMsgs(bss)
|
||||
w.intf.notifySent(size, isLinkTraffic)
|
||||
// Cleanup
|
||||
for _, bs := range bss {
|
||||
util.PutBytes(bs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -2,8 +2,6 @@ package yggdrasil
|
|||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||
)
|
||||
|
||||
// TODO take max size from config
|
||||
|
@ -59,7 +57,7 @@ func (q *packetQueue) cleanup() {
|
|||
worstStream.infos = worstStream.infos[1:]
|
||||
worstStream.size -= uint64(len(packet))
|
||||
q.size -= uint64(len(packet))
|
||||
util.PutBytes(packet)
|
||||
pool_putBytes(packet)
|
||||
// save the modified stream to queues
|
||||
if len(worstStream.infos) > 0 {
|
||||
q.streams[worst] = worstStream
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||
|
||||
"github.com/Arceliar/phony"
|
||||
)
|
||||
|
@ -241,7 +240,6 @@ func (p *peer) _handlePacket(packet []byte) {
|
|||
case wire_LinkProtocolTraffic:
|
||||
p._handleLinkTraffic(packet)
|
||||
default:
|
||||
util.PutBytes(packet)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -347,7 +345,6 @@ func (p *peer) _handleLinkTraffic(bs []byte) {
|
|||
case wire_SwitchMsg:
|
||||
p._handleSwitchMsg(payload)
|
||||
default:
|
||||
util.PutBytes(bs)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
20
src/yggdrasil/pool.go
Normal file
20
src/yggdrasil/pool.go
Normal file
|
@ -0,0 +1,20 @@
|
|||
package yggdrasil
|
||||
|
||||
import "sync"
|
||||
|
||||
// Used internally to reduce allocations in the hot loop
|
||||
// I.e. packets being switched or between the crypto and the switch
|
||||
// For safety reasons, these must not escape this package
|
||||
var pool = sync.Pool{New: func() interface{} { return []byte(nil) }}
|
||||
|
||||
func pool_getBytes(size int) []byte {
|
||||
bs := pool.Get().([]byte)
|
||||
if cap(bs) < size {
|
||||
bs = make([]byte, size)
|
||||
}
|
||||
return bs[:size]
|
||||
}
|
||||
|
||||
func pool_putBytes(bs []byte) {
|
||||
pool.Put(bs)
|
||||
}
|
|
@ -29,7 +29,6 @@ import (
|
|||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||
|
||||
"github.com/Arceliar/phony"
|
||||
)
|
||||
|
@ -178,14 +177,12 @@ func (r *router) _handlePacket(packet []byte) {
|
|||
// Handles incoming traffic, i.e. encapuslated ordinary IPv6 packets.
|
||||
// Passes them to the crypto session worker to be decrypted and sent to the adapter.
|
||||
func (r *router) _handleTraffic(packet []byte) {
|
||||
defer util.PutBytes(packet)
|
||||
p := wire_trafficPacket{}
|
||||
if !p.decode(packet) {
|
||||
return
|
||||
}
|
||||
sinfo, isIn := r.sessions.getSessionForHandle(&p.Handle)
|
||||
if !isIn {
|
||||
util.PutBytes(p.Payload)
|
||||
return
|
||||
}
|
||||
sinfo.recv(r, &p)
|
||||
|
@ -231,7 +228,6 @@ func (r *router) _handleProto(packet []byte) {
|
|||
case wire_DHTLookupResponse:
|
||||
r._handleDHTRes(bs, &p.FromKey)
|
||||
default:
|
||||
util.PutBytes(packet)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -448,12 +448,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
|
|||
select {
|
||||
case <-sinfo.init:
|
||||
default:
|
||||
// TODO find a better way to drop things until initialized
|
||||
util.PutBytes(p.Payload)
|
||||
return
|
||||
}
|
||||
if !sinfo._nonceIsOK(&p.Nonce) {
|
||||
util.PutBytes(p.Payload)
|
||||
return
|
||||
}
|
||||
k := sinfo.sharedSesKey
|
||||
|
@ -463,11 +460,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
|
|||
poolFunc := func() {
|
||||
bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
|
||||
callback := func() {
|
||||
util.PutBytes(p.Payload)
|
||||
if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) {
|
||||
// Either we failed to decrypt, or the session was updated, or we
|
||||
// received this packet in the mean time
|
||||
util.PutBytes(bs)
|
||||
return
|
||||
}
|
||||
sinfo._updateNonce(&p.Nonce)
|
||||
|
@ -485,8 +480,6 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
|
|||
select {
|
||||
case <-sinfo.init:
|
||||
default:
|
||||
// TODO find a better way to drop things until initialized
|
||||
util.PutBytes(msg.Message)
|
||||
return
|
||||
}
|
||||
sinfo.bytesSent += uint64(len(msg.Message))
|
||||
|
@ -505,14 +498,8 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
|
|||
ch := make(chan func(), 1)
|
||||
poolFunc := func() {
|
||||
p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
|
||||
packet := p.encode()
|
||||
callback := func() {
|
||||
// Encoding may block on a util.GetBytes(), so kept out of the worker pool
|
||||
packet := p.encode()
|
||||
// Cleanup
|
||||
util.PutBytes(msg.Message)
|
||||
util.PutBytes(p.Payload)
|
||||
// Send the packet
|
||||
// TODO replace this with a send to the peer struct if that becomes an actor
|
||||
sinfo.sessions.router.Act(sinfo, func() {
|
||||
sinfo.sessions.router.out(packet)
|
||||
})
|
||||
|
|
|
@ -6,8 +6,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||
)
|
||||
|
||||
// Test that this matches the interface we expect
|
||||
|
@ -46,6 +44,9 @@ func (s *stream) writeMsgs(bss [][]byte) (int, error) {
|
|||
}
|
||||
s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
|
||||
_, err := buf.WriteTo(s.rwc)
|
||||
for _, bs := range bss {
|
||||
pool_putBytes(bs)
|
||||
}
|
||||
// TODO only include number of bytes from bs *successfully* written?
|
||||
return written, err
|
||||
}
|
||||
|
@ -112,7 +113,7 @@ func (s *stream) readMsgFromBuffer() ([]byte, error) {
|
|||
if msgLen > streamMsgSize {
|
||||
return nil, errors.New("oversized message")
|
||||
}
|
||||
msg := util.ResizeBytes(util.GetBytes(), int(msgLen))
|
||||
msg := pool_getBytes(int(msgLen))
|
||||
_, err = io.ReadFull(s.inputBuffer, msg)
|
||||
return msg, err
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ package yggdrasil
|
|||
|
||||
import (
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -230,8 +229,9 @@ type wire_trafficPacket struct {
|
|||
}
|
||||
|
||||
// Encodes a wire_trafficPacket into its wire format.
|
||||
// The returned slice was taken from the pool.
|
||||
func (p *wire_trafficPacket) encode() []byte {
|
||||
bs := util.GetBytes()
|
||||
bs := pool_getBytes(0)
|
||||
bs = wire_put_uint64(wire_Traffic, bs)
|
||||
bs = wire_put_coords(p.Coords, bs)
|
||||
bs = append(bs, p.Handle[:]...)
|
||||
|
@ -241,7 +241,9 @@ func (p *wire_trafficPacket) encode() []byte {
|
|||
}
|
||||
|
||||
// Decodes an encoded wire_trafficPacket into the struct, returning true if successful.
|
||||
// Either way, the argument slice is added to the pool.
|
||||
func (p *wire_trafficPacket) decode(bs []byte) bool {
|
||||
defer pool_putBytes(bs)
|
||||
var pType uint64
|
||||
switch {
|
||||
case !wire_chop_uint64(&pType, &bs):
|
||||
|
@ -255,7 +257,7 @@ func (p *wire_trafficPacket) decode(bs []byte) bool {
|
|||
case !wire_chop_slice(p.Nonce[:], &bs):
|
||||
return false
|
||||
}
|
||||
p.Payload = append(util.GetBytes(), bs...)
|
||||
p.Payload = append(p.Payload, bs...)
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue