Read callback, other tweaks

This commit is contained in:
Neil Alexander 2020-07-01 20:17:38 +01:00
parent 41ce47d6d7
commit 4dfddd804f
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 87 additions and 33 deletions

View file

@ -43,7 +43,7 @@ type TunAdapter struct {
config *config.NodeState config *config.NodeState
log *log.Logger log *log.Logger
reconfigure chan chan error reconfigure chan chan error
packetConn net.PacketConn packetConn *yggdrasil.PacketConn
addr address.Address addr address.Address
subnet address.Subnet subnet address.Subnet
addrToBoxPubKey map[address.Address]*crypto.BoxPubKey addrToBoxPubKey map[address.Address]*crypto.BoxPubKey
@ -58,7 +58,7 @@ type TunAdapter struct {
} }
type TunOptions struct { type TunOptions struct {
PacketConn net.PacketConn PacketConn *yggdrasil.PacketConn
} }
// Gets the maximum supported MTU for the platform based on the defaults in // Gets the maximum supported MTU for the platform based on the defaults in
@ -168,18 +168,9 @@ func (tun *TunAdapter) _start() error {
tun.isOpen = true tun.isOpen = true
tun.reader.Act(nil, tun.reader._read) // Start the reader tun.reader.Act(nil, tun.reader._read) // Start the reader
tun.ckr.init(tun) tun.ckr.init(tun)
go func() { tun.packetConn.SetReadCallback(func(addr net.Addr, bs []byte) {
// TODO: put this somewhere more elegant tun.writer.writeFrom(nil, bs)
buf := make([]byte, 65535) })
for {
n, _, err := tun.packetConn.ReadFrom(buf)
if err != nil {
log.Errorln("tun.packetConn.ReadFrom:", err)
continue
}
tun.writer.writeFrom(nil, buf[:n])
}
}()
return nil return nil
} }

View file

@ -228,7 +228,7 @@ func (c *Core) GetSessions() []Session {
// PacketConn returns a net.PacketConn which can be used to send and receive // PacketConn returns a net.PacketConn which can be used to send and receive
// information over Yggdrasil sessions. // information over Yggdrasil sessions.
func (c *Core) PacketConn() net.PacketConn { func (c *Core) PacketConn() *PacketConn {
return c.router.sessions.packetConn return c.router.sessions.packetConn
} }

View file

@ -11,16 +11,19 @@ import (
) )
type packet struct { type packet struct {
addr *crypto.BoxPubKey addr net.Addr
payload []byte payload []byte
} }
type PacketConn struct { type PacketConn struct {
phony.Inbox phony.Inbox
net.PacketConn net.PacketConn
closed bool sessions *sessions
readBuffer chan packet closed bool
sessions *sessions readCallback func(net.Addr, []byte)
readBuffer chan packet
readDeadline *time.Time
writeDeadline *time.Time
} }
func newPacketConn(ss *sessions) *PacketConn { func newPacketConn(ss *sessions) *PacketConn {
@ -30,9 +33,23 @@ func newPacketConn(ss *sessions) *PacketConn {
} }
} }
func (c *PacketConn) _sendToReader(addr net.Addr, payload []byte) {
if c.readCallback == nil {
c.readBuffer <- packet{
addr: addr,
payload: payload,
}
} else {
c.readCallback(addr, payload)
}
}
// implements net.PacketConn // implements net.PacketConn
func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
if c.closed { if c.readCallback != nil {
return 0, nil, errors.New("read callback is configured")
}
if c.closed { // TODO: unsafe?
return 0, nil, PacketConnError{closed: true} return 0, nil, PacketConnError{closed: true}
} }
packet := <-c.readBuffer packet := <-c.readBuffer
@ -42,19 +59,23 @@ func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
// implements net.PacketConn // implements net.PacketConn
func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
if c.closed { if c.closed { // TODO: unsafe?
return 0, PacketConnError{closed: true} return 0, PacketConnError{closed: true}
} }
boxPubKey, ok := addr.(*crypto.BoxPubKey) boxPubKey, ok := addr.(*crypto.BoxPubKey)
if !ok { if !ok {
return 0, errors.New("expected crypto.BoxPubKey as net.Addr") return 0, errors.New("expected *crypto.BoxPubKey as net.Addr")
} }
session, ok := c.sessions.getByTheirPerm(boxPubKey) var session *sessionInfo
if !ok { phony.Block(c.sessions.router, func() {
session = c.sessions.createSession(boxPubKey) var ok bool
} session, ok = c.sessions.getByTheirPerm(boxPubKey)
if !ok {
session = c.sessions.createSession(boxPubKey)
}
})
if session == nil { if session == nil {
return 0, errors.New("expected a session but there was none") return 0, errors.New("expected a session but there was none")
} }
@ -112,6 +133,7 @@ func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
// implements net.PacketConn // implements net.PacketConn
func (c *PacketConn) Close() error { func (c *PacketConn) Close() error {
// TODO: implement this. don't know what makes sense for net.PacketConn?
return nil return nil
} }
@ -120,18 +142,60 @@ func (c *PacketConn) LocalAddr() net.Addr {
return &c.sessions.router.core.boxPub return &c.sessions.router.core.boxPub
} }
// implements net.PacketConn // SetReadCallback allows you to specify a function that will be called whenever
// a packet is received. This should be used if you wish to implement
// asynchronous patterns for receiving data from the remote node.
//
// Note that if a read callback has been supplied, you should no longer attempt
// to use the synchronous Read function.
func (c *PacketConn) SetReadCallback(callback func(net.Addr, []byte)) {
c.Act(nil, func() {
c.readCallback = callback
c._drainReadBuffer()
})
}
func (c *PacketConn) _drainReadBuffer() {
if c.readCallback == nil {
return
}
select {
case bs := <-c.readBuffer:
c.readCallback(bs.addr, bs.payload)
c.Act(nil, c._drainReadBuffer) // In case there's more
default:
}
}
// SetDeadline is equivalent to calling both SetReadDeadline and
// SetWriteDeadline with the same value, configuring the maximum amount of time
// that synchronous Read and Write operations can block for. If no deadline is
// configured, Read and Write operations can potentially block indefinitely.
func (c *PacketConn) SetDeadline(t time.Time) error { func (c *PacketConn) SetDeadline(t time.Time) error {
c.SetReadDeadline(t)
c.SetWriteDeadline(t)
return nil return nil
} }
// implements net.PacketConn // SetReadDeadline configures the maximum amount of time that a synchronous Read
// operation can block for. A Read operation will unblock at the point that the
// read deadline is reached if no other condition (such as data arrival or
// connection closure) happens first. If no deadline is configured, Read
// operations can potentially block indefinitely.
func (c *PacketConn) SetReadDeadline(t time.Time) error { func (c *PacketConn) SetReadDeadline(t time.Time) error {
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
phony.Block(c, func() { c.readDeadline = &t })
return nil return nil
} }
// implements net.PacketConn // SetWriteDeadline configures the maximum amount of time that a synchronous
// Write operation can block for. A Write operation will unblock at the point
// that the read deadline is reached if no other condition (such as data sending
// or connection closure) happens first. If no deadline is configured, Write
// operations can potentially block indefinitely.
func (c *PacketConn) SetWriteDeadline(t time.Time) error { func (c *PacketConn) SetWriteDeadline(t time.Time) error {
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
phony.Block(c, func() { c.writeDeadline = &t })
return nil return nil
} }

View file

@ -447,10 +447,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
} }
sinfo._updateNonce(&p.Nonce) sinfo._updateNonce(&p.Nonce)
sinfo.bytesRecvd += uint64(len(bs)) sinfo.bytesRecvd += uint64(len(bs))
sinfo.sessions.packetConn.readBuffer <- packet{ sinfo.sessions.packetConn.Act(sinfo, func() {
addr: &sinfo.theirPermPub, sinfo.sessions.packetConn._sendToReader(&sinfo.theirPermPub, bs)
payload: bs, })
}
} }
ch <- callback ch <- callback
sinfo.checkCallbacks() sinfo.checkCallbacks()