diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 3d6be276..d17f39ba 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -43,7 +43,7 @@ type TunAdapter struct { config *config.NodeState log *log.Logger reconfigure chan chan error - packetConn net.PacketConn + packetConn *yggdrasil.PacketConn addr address.Address subnet address.Subnet addrToBoxPubKey map[address.Address]*crypto.BoxPubKey @@ -58,7 +58,7 @@ type TunAdapter struct { } type TunOptions struct { - PacketConn net.PacketConn + PacketConn *yggdrasil.PacketConn } // Gets the maximum supported MTU for the platform based on the defaults in @@ -168,18 +168,9 @@ func (tun *TunAdapter) _start() error { tun.isOpen = true tun.reader.Act(nil, tun.reader._read) // Start the reader tun.ckr.init(tun) - go func() { - // TODO: put this somewhere more elegant - buf := make([]byte, 65535) - for { - n, _, err := tun.packetConn.ReadFrom(buf) - if err != nil { - log.Errorln("tun.packetConn.ReadFrom:", err) - continue - } - tun.writer.writeFrom(nil, buf[:n]) - } - }() + tun.packetConn.SetReadCallback(func(addr net.Addr, bs []byte) { + tun.writer.writeFrom(nil, bs) + }) return nil } diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index c02c52c7..328c1b9e 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -228,7 +228,7 @@ func (c *Core) GetSessions() []Session { // PacketConn returns a net.PacketConn which can be used to send and receive // information over Yggdrasil sessions. -func (c *Core) PacketConn() net.PacketConn { +func (c *Core) PacketConn() *PacketConn { return c.router.sessions.packetConn } diff --git a/src/yggdrasil/packetconn.go b/src/yggdrasil/packetconn.go index 43c75a8d..0e2a41d4 100644 --- a/src/yggdrasil/packetconn.go +++ b/src/yggdrasil/packetconn.go @@ -11,16 +11,19 @@ import ( ) type packet struct { - addr *crypto.BoxPubKey + addr net.Addr payload []byte } type PacketConn struct { phony.Inbox net.PacketConn - closed bool - readBuffer chan packet - sessions *sessions + sessions *sessions + closed bool + readCallback func(net.Addr, []byte) + readBuffer chan packet + readDeadline *time.Time + writeDeadline *time.Time } func newPacketConn(ss *sessions) *PacketConn { @@ -30,9 +33,23 @@ func newPacketConn(ss *sessions) *PacketConn { } } +func (c *PacketConn) _sendToReader(addr net.Addr, payload []byte) { + if c.readCallback == nil { + c.readBuffer <- packet{ + addr: addr, + payload: payload, + } + } else { + c.readCallback(addr, payload) + } +} + // implements net.PacketConn func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { - if c.closed { + if c.readCallback != nil { + return 0, nil, errors.New("read callback is configured") + } + if c.closed { // TODO: unsafe? return 0, nil, PacketConnError{closed: true} } packet := <-c.readBuffer @@ -42,19 +59,23 @@ func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { // implements net.PacketConn func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { - if c.closed { + if c.closed { // TODO: unsafe? return 0, PacketConnError{closed: true} } boxPubKey, ok := addr.(*crypto.BoxPubKey) if !ok { - return 0, errors.New("expected crypto.BoxPubKey as net.Addr") + return 0, errors.New("expected *crypto.BoxPubKey as net.Addr") } - session, ok := c.sessions.getByTheirPerm(boxPubKey) - if !ok { - session = c.sessions.createSession(boxPubKey) - } + var session *sessionInfo + phony.Block(c.sessions.router, func() { + var ok bool + session, ok = c.sessions.getByTheirPerm(boxPubKey) + if !ok { + session = c.sessions.createSession(boxPubKey) + } + }) if session == nil { return 0, errors.New("expected a session but there was none") } @@ -112,6 +133,7 @@ func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { // implements net.PacketConn func (c *PacketConn) Close() error { + // TODO: implement this. don't know what makes sense for net.PacketConn? return nil } @@ -120,18 +142,60 @@ func (c *PacketConn) LocalAddr() net.Addr { return &c.sessions.router.core.boxPub } -// implements net.PacketConn +// SetReadCallback allows you to specify a function that will be called whenever +// a packet is received. This should be used if you wish to implement +// asynchronous patterns for receiving data from the remote node. +// +// Note that if a read callback has been supplied, you should no longer attempt +// to use the synchronous Read function. +func (c *PacketConn) SetReadCallback(callback func(net.Addr, []byte)) { + c.Act(nil, func() { + c.readCallback = callback + c._drainReadBuffer() + }) +} + +func (c *PacketConn) _drainReadBuffer() { + if c.readCallback == nil { + return + } + select { + case bs := <-c.readBuffer: + c.readCallback(bs.addr, bs.payload) + c.Act(nil, c._drainReadBuffer) // In case there's more + default: + } +} + +// SetDeadline is equivalent to calling both SetReadDeadline and +// SetWriteDeadline with the same value, configuring the maximum amount of time +// that synchronous Read and Write operations can block for. If no deadline is +// configured, Read and Write operations can potentially block indefinitely. func (c *PacketConn) SetDeadline(t time.Time) error { + c.SetReadDeadline(t) + c.SetWriteDeadline(t) return nil } -// implements net.PacketConn +// SetReadDeadline configures the maximum amount of time that a synchronous Read +// operation can block for. A Read operation will unblock at the point that the +// read deadline is reached if no other condition (such as data arrival or +// connection closure) happens first. If no deadline is configured, Read +// operations can potentially block indefinitely. func (c *PacketConn) SetReadDeadline(t time.Time) error { + // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... + phony.Block(c, func() { c.readDeadline = &t }) return nil } -// implements net.PacketConn +// SetWriteDeadline configures the maximum amount of time that a synchronous +// Write operation can block for. A Write operation will unblock at the point +// that the read deadline is reached if no other condition (such as data sending +// or connection closure) happens first. If no deadline is configured, Write +// operations can potentially block indefinitely. func (c *PacketConn) SetWriteDeadline(t time.Time) error { + // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... + phony.Block(c, func() { c.writeDeadline = &t }) return nil } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 16828052..9b5d17c2 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -447,10 +447,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { } sinfo._updateNonce(&p.Nonce) sinfo.bytesRecvd += uint64(len(bs)) - sinfo.sessions.packetConn.readBuffer <- packet{ - addr: &sinfo.theirPermPub, - payload: bs, - } + sinfo.sessions.packetConn.Act(sinfo, func() { + sinfo.sessions.packetConn._sendToReader(&sinfo.theirPermPub, bs) + }) } ch <- callback sinfo.checkCallbacks()