Initial work on net.PacketConn

This commit is contained in:
Neil Alexander 2020-07-01 19:18:50 +01:00
parent aec82d7a39
commit aed13e6d50
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
11 changed files with 321 additions and 1006 deletions

View file

@ -297,19 +297,11 @@ func main() {
} }
n.multicast.SetupAdminHandlers(n.admin.(*admin.AdminSocket)) n.multicast.SetupAdminHandlers(n.admin.(*admin.AdminSocket))
// Start the TUN/TAP interface // Start the TUN/TAP interface
if listener, err := n.core.ConnListen(); err == nil { n.tuntap.Init(&n.core, n.state, logger, tuntap.TunOptions{PacketConn: n.core.PacketConn()})
if dialer, err := n.core.ConnDialer(); err == nil { if err := n.tuntap.Start(); err != nil {
n.tuntap.Init(&n.core, n.state, logger, tuntap.TunOptions{Listener: listener, Dialer: dialer}) logger.Errorln("An error occurred starting TUN/TAP:", err)
if err := n.tuntap.Start(); err != nil {
logger.Errorln("An error occurred starting TUN/TAP:", err)
}
n.tuntap.SetupAdminHandlers(n.admin.(*admin.AdminSocket))
} else {
logger.Errorln("Unable to get Dialer:", err)
}
} else {
logger.Errorln("Unable to get Listener:", err)
} }
n.tuntap.SetupAdminHandlers(n.admin.(*admin.AdminSocket))
// Make some nice output that tells us what our IPv6 address and subnet are. // Make some nice output that tells us what our IPv6 address and subnet are.
// This is just logged to stdout for the user. // This is just logged to stdout for the user.
address := n.core.Address() address := n.core.Address()

View file

@ -1,227 +0,0 @@
package tuntap
import (
"bytes"
"errors"
"time"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv6"
)
const tunConnTimeout = 2 * time.Minute
type tunConn struct {
phony.Inbox
tun *TunAdapter
conn *yggdrasil.Conn
addr address.Address
snet address.Subnet
stop chan struct{}
alive *time.Timer // From calling time.AfterFunc
}
func (s *tunConn) close() {
s.tun.Act(s, s._close_from_tun)
}
func (s *tunConn) _close_from_tun() {
go s.conn.Close() // Just in case it blocks on actor operations
delete(s.tun.addrToConn, s.addr)
delete(s.tun.subnetToConn, s.snet)
func() {
defer func() { recover() }()
close(s.stop) // Closes reader/writer goroutines
}()
}
func (s *tunConn) _read(bs []byte) (err error) {
select {
case <-s.stop:
err = errors.New("session was already closed")
return
default:
}
if len(bs) == 0 {
err = errors.New("read packet with 0 size")
return
}
ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40
ipv6 := len(bs) > 40 && bs[0]&0xf0 == 0x60
isCGA := true
// Check source addresses
switch {
case ipv6 && bs[8] == 0x02 && bytes.Equal(s.addr[:16], bs[8:24]): // source
case ipv6 && bs[8] == 0x03 && bytes.Equal(s.snet[:8], bs[8:16]): // source
default:
isCGA = false
}
// Check destination addresses
switch {
case ipv6 && bs[24] == 0x02 && bytes.Equal(s.tun.addr[:16], bs[24:40]): // destination
case ipv6 && bs[24] == 0x03 && bytes.Equal(s.tun.subnet[:8], bs[24:32]): // destination
default:
isCGA = false
}
// Decide how to handle the packet
var skip bool
switch {
case isCGA: // Allowed
case s.tun.ckr.isEnabled() && (ipv4 || ipv6):
var srcAddr address.Address
var dstAddr address.Address
var addrlen int
if ipv4 {
copy(srcAddr[:], bs[12:16])
copy(dstAddr[:], bs[16:20])
addrlen = 4
}
if ipv6 {
copy(srcAddr[:], bs[8:24])
copy(dstAddr[:], bs[24:40])
addrlen = 16
}
if !s.tun.ckr.isValidLocalAddress(dstAddr, addrlen) {
// The destination address isn't in our CKR allowed range
skip = true
} else if key, err := s.tun.ckr.getPublicKeyForAddress(srcAddr, addrlen); err == nil {
if *s.conn.RemoteAddr().(*crypto.BoxPubKey) == key {
// This is the one allowed CKR case, where source and destination addresses are both good
} else {
// The CKR key associated with this address doesn't match the sender's NodeID
skip = true
}
} else {
// We have no CKR route for this source address
skip = true
}
default:
skip = true
}
if skip {
err = errors.New("address not allowed")
return
}
s.tun.writer.writeFrom(s, bs)
s.stillAlive()
return
}
func (s *tunConn) writeFrom(from phony.Actor, bs []byte) {
s.Act(from, func() {
s._write(bs)
})
}
func (s *tunConn) _write(bs []byte) (err error) {
select {
case <-s.stop:
err = errors.New("session was already closed")
return
default:
}
v4 := len(bs) > 20 && bs[0]&0xf0 == 0x40
v6 := len(bs) > 40 && bs[0]&0xf0 == 0x60
isCGA := true
// Check source addresses
switch {
case v6 && bs[8] == 0x02 && bytes.Equal(s.tun.addr[:16], bs[8:24]): // source
case v6 && bs[8] == 0x03 && bytes.Equal(s.tun.subnet[:8], bs[8:16]): // source
default:
isCGA = false
}
// Check destiantion addresses
switch {
case v6 && bs[24] == 0x02 && bytes.Equal(s.addr[:16], bs[24:40]): // destination
case v6 && bs[24] == 0x03 && bytes.Equal(s.snet[:8], bs[24:32]): // destination
default:
isCGA = false
}
// Decide how to handle the packet
var skip bool
switch {
case isCGA: // Allowed
case s.tun.ckr.isEnabled() && (v4 || v6):
var srcAddr address.Address
var dstAddr address.Address
var addrlen int
if v4 {
copy(srcAddr[:], bs[12:16])
copy(dstAddr[:], bs[16:20])
addrlen = 4
}
if v6 {
copy(srcAddr[:], bs[8:24])
copy(dstAddr[:], bs[24:40])
addrlen = 16
}
if !s.tun.ckr.isValidLocalAddress(srcAddr, addrlen) {
// The source address isn't in our CKR allowed range
skip = true
} else if key, err := s.tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil {
if *s.conn.RemoteAddr().(*crypto.BoxPubKey) == key {
// This is the one allowed CKR case, where source and destination addresses are both good
} else {
// The CKR key associated with this address doesn't match the sender's NodeID
skip = true
}
} else {
// We have no CKR route for this destination address... why do we have the packet in the first place?
skip = true
}
default:
skip = true
}
if skip {
err = errors.New("address not allowed")
return
}
msg := yggdrasil.FlowKeyMessage{
FlowKey: util.GetFlowKey(bs),
Message: bs,
}
s.conn.WriteFrom(s, msg, func(err error) {
if err == nil {
// No point in wasting resources to send back an error if there was none
return
}
s.Act(s.conn, func() {
if e, eok := err.(yggdrasil.ConnError); !eok {
if e.Closed() {
s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err)
} else {
s.tun.log.Errorln(s.conn.String(), "TUN/TAP generic write error:", err)
}
} else if e.PacketTooBig() {
// TODO: This currently isn't aware of IPv4 for CKR
ptb := &icmp.PacketTooBig{
MTU: int(e.PacketMaximumSize()),
Data: bs[:900],
}
if packet, err := CreateICMPv6(bs[8:24], bs[24:40], ipv6.ICMPTypePacketTooBig, 0, ptb); err == nil {
s.tun.writer.writeFrom(s, packet)
}
} else {
if e.Closed() {
s.tun.log.Debugln(s.conn.String(), "TUN/TAP conn write debug:", err)
} else {
s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn write error:", err)
}
}
})
})
s.stillAlive()
return
}
func (s *tunConn) stillAlive() {
if s.alive != nil {
s.alive.Stop()
}
s.alive = time.AfterFunc(tunConnTimeout, s.close)
}

View file

@ -1,9 +1,11 @@
package tuntap package tuntap
import ( import (
"fmt"
"net"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -149,60 +151,45 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) {
// Couldn't find this node's ygg IP // Couldn't find this node's ygg IP
return return
} }
// Do we have an active connection for this node address?
var dstString string var boxPubKey *crypto.BoxPubKey
session, isIn := tun.addrToConn[dstAddr] if key, ok := tun.addrToBoxPubKey[dstAddr]; ok {
if !isIn || session == nil { boxPubKey = key
session, isIn = tun.subnetToConn[dstSnet] } else if key, ok := tun.subnetToBoxPubKey[dstSnet]; ok {
if !isIn || session == nil { boxPubKey = key
// Neither an address nor a subnet mapping matched, therefore populate } else {
// the node ID and mask to commence a search var dstNodeID, dstNodeMask *crypto.NodeID
if dstAddr.IsValid() { if dstAddr[0] == 0x02 {
dstString = dstAddr.GetNodeIDLengthString() dstNodeID, dstNodeMask = dstAddr.GetNodeIDandMask()
} else { } else if dstAddr[0] == 0x03 {
dstString = dstSnet.GetNodeIDLengthString() dstNodeID, dstNodeMask = dstSnet.GetNodeIDandMask()
}
} }
if dstNodeID == nil || dstNodeMask == nil {
tun.log.Errorln("Didn't find node ID/mask")
return
}
fmt.Println("Start search for", net.IP(dstAddr[:]).String())
_, boxPubKey, err = tun.core.Resolve(dstNodeID, dstNodeMask)
if err != nil {
tun.log.Errorln("tun.core.Resolve:", err)
return
}
tun.addrToBoxPubKey[dstAddr] = boxPubKey
tun.subnetToBoxPubKey[dstSnet] = boxPubKey
} }
// If we don't have a connection then we should open one
if !isIn || session == nil { if boxPubKey == nil {
// Check we haven't been given empty node ID, really this shouldn't ever tun.log.Errorln("No destination public key found for this packet")
// happen but just to be sure... return
if dstString == "" {
panic("Given empty dstString - this shouldn't happen")
}
_, known := tun.dials[dstString]
tun.dials[dstString] = append(tun.dials[dstString], bs)
for len(tun.dials[dstString]) > 32 {
tun.dials[dstString] = tun.dials[dstString][1:]
}
if !known {
go func() {
conn, err := tun.dialer.Dial("nodeid", dstString)
tun.Act(nil, func() {
packets := tun.dials[dstString]
delete(tun.dials, dstString)
if err != nil {
return
}
// We've been given a connection so prepare the session wrapper
var tc *tunConn
if tc, err = tun._wrap(conn.(*yggdrasil.Conn)); err != nil {
// Something went wrong when storing the connection, typically that
// something already exists for this address or subnet
tun.log.Debugln("TUN iface wrap:", err)
return
}
for _, packet := range packets {
tc.writeFrom(nil, packet)
}
})
return
}()
}
} }
// If we have a connection now, try writing to it
if isIn && session != nil { n, err = tun.packetConn.WriteTo(recvd, boxPubKey)
session.writeFrom(tun, bs) if err != nil {
tun.log.Errorln("tun.packetConn.WriteTo:", err)
}
if n != len(recvd) {
tun.log.Errorln("Expected to send", len(recvd), "bytes but sent", n, "bytes")
} }
} }

View file

@ -37,31 +37,28 @@ const tun_IPv6_HEADER_LENGTH = 40
// should pass this object to the yggdrasil.SetRouterAdapter() function before // should pass this object to the yggdrasil.SetRouterAdapter() function before
// calling yggdrasil.Start(). // calling yggdrasil.Start().
type TunAdapter struct { type TunAdapter struct {
core *yggdrasil.Core core *yggdrasil.Core
writer tunWriter writer tunWriter
reader tunReader reader tunReader
config *config.NodeState config *config.NodeState
log *log.Logger log *log.Logger
reconfigure chan chan error reconfigure chan chan error
listener *yggdrasil.Listener packetConn net.PacketConn
dialer *yggdrasil.Dialer addr address.Address
addr address.Address subnet address.Subnet
subnet address.Subnet addrToBoxPubKey map[address.Address]*crypto.BoxPubKey
ckr cryptokey subnetToBoxPubKey map[address.Subnet]*crypto.BoxPubKey
icmpv6 ICMPv6 ckr cryptokey
mtu MTU icmpv6 ICMPv6
iface tun.Device mtu MTU
phony.Inbox // Currently only used for _handlePacket from the reader, TODO: all the stuff that currently needs a mutex below iface tun.Device
//mutex sync.RWMutex // Protects the below phony.Inbox // Currently only used for _handlePacket from the reader, TODO: all the stuff that currently needs a mutex below
addrToConn map[address.Address]*tunConn dials map[string][][]byte // Buffer of packets to send after dialing finishes
subnetToConn map[address.Subnet]*tunConn isOpen bool
dials map[string][][]byte // Buffer of packets to send after dialing finishes
isOpen bool
} }
type TunOptions struct { type TunOptions struct {
Listener *yggdrasil.Listener PacketConn net.PacketConn
Dialer *yggdrasil.Dialer
} }
// Gets the maximum supported MTU for the platform based on the defaults in // Gets the maximum supported MTU for the platform based on the defaults in
@ -120,10 +117,9 @@ func (tun *TunAdapter) Init(core *yggdrasil.Core, config *config.NodeState, log
tun.core = core tun.core = core
tun.config = config tun.config = config
tun.log = log tun.log = log
tun.listener = tunoptions.Listener tun.packetConn = tunoptions.PacketConn
tun.dialer = tunoptions.Dialer tun.addrToBoxPubKey = make(map[address.Address]*crypto.BoxPubKey)
tun.addrToConn = make(map[address.Address]*tunConn) tun.subnetToBoxPubKey = make(map[address.Subnet]*crypto.BoxPubKey)
tun.subnetToConn = make(map[address.Subnet]*tunConn)
tun.dials = make(map[string][][]byte) tun.dials = make(map[string][][]byte)
tun.writer.tun = tun tun.writer.tun = tun
tun.reader.tun = tun tun.reader.tun = tun
@ -145,7 +141,7 @@ func (tun *TunAdapter) _start() error {
return errors.New("TUN module is already started") return errors.New("TUN module is already started")
} }
current := tun.config.GetCurrent() current := tun.config.GetCurrent()
if tun.config == nil || tun.listener == nil || tun.dialer == nil { if tun.config == nil {
return errors.New("no configuration available to TUN") return errors.New("no configuration available to TUN")
} }
var boxPub crypto.BoxPubKey var boxPub crypto.BoxPubKey
@ -170,9 +166,20 @@ func (tun *TunAdapter) _start() error {
} }
tun.core.SetMaximumSessionMTU(tun.MTU()) tun.core.SetMaximumSessionMTU(tun.MTU())
tun.isOpen = true tun.isOpen = true
go tun.handler()
tun.reader.Act(nil, tun.reader._read) // Start the reader tun.reader.Act(nil, tun.reader._read) // Start the reader
tun.ckr.init(tun) tun.ckr.init(tun)
go func() {
// TODO: put this somewhere more elegant
buf := make([]byte, 65535)
for {
n, _, err := tun.packetConn.ReadFrom(buf)
if err != nil {
log.Errorln("tun.packetConn.ReadFrom:", err)
continue
}
tun.writer.writeFrom(nil, buf[:n])
}
}()
return nil return nil
} }
@ -223,61 +230,3 @@ func (tun *TunAdapter) UpdateConfig(config *config.NodeConfig) {
// Notify children about the configuration change // Notify children about the configuration change
tun.Act(nil, tun.ckr.configure) tun.Act(nil, tun.ckr.configure)
} }
func (tun *TunAdapter) handler() error {
for {
// Accept the incoming connection
conn, err := tun.listener.Accept()
if err != nil {
tun.log.Errorln("TUN connection accept error:", err)
return err
}
phony.Block(tun, func() {
if _, err := tun._wrap(conn.(*yggdrasil.Conn)); err != nil {
// Something went wrong when storing the connection, typically that
// something already exists for this address or subnet
tun.log.Debugln("TUN handler wrap:", err)
}
})
}
}
func (tun *TunAdapter) _wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
// Prepare a session wrapper for the given connection
s := tunConn{
tun: tun,
conn: conn,
stop: make(chan struct{}),
}
c = &s
// Get the remote address and subnet of the other side
remotePubKey := conn.RemoteAddr().(*crypto.BoxPubKey)
remoteNodeID := crypto.GetNodeID(remotePubKey)
s.addr = *address.AddrForNodeID(remoteNodeID)
s.snet = *address.SubnetForNodeID(remoteNodeID)
// Work out if this is already a destination we already know about
atc, aok := tun.addrToConn[s.addr]
stc, sok := tun.subnetToConn[s.snet]
// If we know about a connection for this destination already then assume it
// is no longer valid and close it
if aok {
atc._close_from_tun()
err = errors.New("replaced connection for address")
} else if sok {
stc._close_from_tun()
err = errors.New("replaced connection for subnet")
}
// Save the session wrapper so that we can look it up quickly next time
// we receive a packet through the interface for this address
tun.addrToConn[s.addr] = &s
tun.subnetToConn[s.snet] = &s
// Set the read callback and start the timeout
conn.SetReadCallback(func(bs []byte) {
s.Act(conn, func() {
s._read(bs)
})
})
s.Act(nil, s.stillAlive)
// Return
return c, err
}

View file

@ -11,6 +11,7 @@ import (
"github.com/gologme/log" "github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/types"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -99,7 +100,7 @@ type Session struct {
Coords []uint64 // The coordinates of the remote node Coords []uint64 // The coordinates of the remote node
BytesSent uint64 // Bytes sent to the session BytesSent uint64 // Bytes sent to the session
BytesRecvd uint64 // Bytes received from the session BytesRecvd uint64 // Bytes received from the session
MTU MTU // The maximum supported message size of the session MTU types.MTU // The maximum supported message size of the session
Uptime time.Duration // How long this session has been active for Uptime time.Duration // How long this session has been active for
WasMTUFixed bool // This field is no longer used WasMTUFixed bool // This field is no longer used
} }
@ -225,32 +226,50 @@ func (c *Core) GetSessions() []Session {
return sessions return sessions
} }
// ConnListen returns a listener for Yggdrasil session connections. You can only // PacketConn returns a net.PacketConn which can be used to send and receive
// call this function once as each Yggdrasil node can only have a single // information over Yggdrasil sessions.
// ConnListener. Make sure to keep the reference to this for as long as it is func (c *Core) PacketConn() net.PacketConn {
// needed. return c.router.sessions.packetConn
func (c *Core) ConnListen() (*Listener, error) {
c.router.sessions.listenerMutex.Lock()
defer c.router.sessions.listenerMutex.Unlock()
if c.router.sessions.listener != nil {
return nil, errors.New("a listener already exists")
}
c.router.sessions.listener = &Listener{
core: c,
conn: make(chan *Conn),
close: make(chan interface{}),
}
return c.router.sessions.listener, nil
} }
// ConnDialer returns a dialer for Yggdrasil session connections. Since // Resolve takes a masked node ID and performs a search, returning the complete
// ConnDialers are stateless, you can request as many dialers as you like, // node ID and the node's public key.
// although ideally you should request only one and keep the reference to it for func (c *Core) Resolve(nodeID, nodeMask *crypto.NodeID) (fullNodeID *crypto.NodeID, boxPubKey *crypto.BoxPubKey, err error) {
// as long as it is needed. fmt.Println("**** START RESOLVE")
func (c *Core) ConnDialer() (*Dialer, error) { defer fmt.Println("**** END RESOLVE")
return &Dialer{
core: c, done := make(chan struct{})
}, nil c.router.Act(c, func() {
_, isIn := c.router.searches.searches[*nodeID]
if !isIn {
searchCompleted := func(sinfo *sessionInfo, e error) {
select {
case <-done:
// Somehow this was called multiple times, TODO don't let that happen
if sinfo != nil {
// Need to clean up to avoid a session leak
sinfo.cancel.Cancel(nil)
sinfo.sessions.removeSession(sinfo)
}
default:
if sinfo != nil {
fullNodeID = crypto.GetNodeID(&sinfo.theirPermPub)
boxPubKey = &sinfo.theirPermPub
c.router.sessions.createSession(&sinfo.theirPermPub)
}
err = e
close(done)
}
}
sinfo := c.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted)
sinfo.startSearch()
} else {
err = errors.New("search already exists")
close(done)
}
})
<-done
return
} }
// ListenTCP starts a new TCP listener. The input URI should match that of the // ListenTCP starts a new TCP listener. The input URI should match that of the
@ -344,8 +363,8 @@ func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) {
} }
// GetMaximumSessionMTU returns the maximum allowed session MTU size. // GetMaximumSessionMTU returns the maximum allowed session MTU size.
func (c *Core) GetMaximumSessionMTU() MTU { func (c *Core) GetMaximumSessionMTU() types.MTU {
var mtu MTU var mtu types.MTU
phony.Block(&c.router, func() { phony.Block(&c.router, func() {
mtu = c.router.sessions.myMaximumMTU mtu = c.router.sessions.myMaximumMTU
}) })
@ -355,7 +374,7 @@ func (c *Core) GetMaximumSessionMTU() MTU {
// SetMaximumSessionMTU sets the maximum allowed session MTU size. The default // SetMaximumSessionMTU sets the maximum allowed session MTU size. The default
// value is 65535 bytes. Session pings will be sent to update all open sessions // value is 65535 bytes. Session pings will be sent to update all open sessions
// if the MTU has changed. // if the MTU has changed.
func (c *Core) SetMaximumSessionMTU(mtu MTU) { func (c *Core) SetMaximumSessionMTU(mtu types.MTU) {
phony.Block(&c.router, func() { phony.Block(&c.router, func() {
if c.router.sessions.myMaximumMTU != mtu { if c.router.sessions.myMaximumMTU != mtu {
c.router.sessions.myMaximumMTU = mtu c.router.sessions.myMaximumMTU = mtu

View file

@ -1,399 +0,0 @@
package yggdrasil
import (
"errors"
"fmt"
"net"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/types"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony"
)
type MTU = types.MTU
// ConnError implements the net.Error interface
type ConnError struct {
error
timeout bool
temporary bool
closed bool
maxsize int
}
// Timeout returns true if the error relates to a timeout condition on the
// connection.
func (e *ConnError) Timeout() bool {
return e.timeout
}
// Temporary return true if the error is temporary or false if it is a permanent
// error condition.
func (e *ConnError) Temporary() bool {
return e.temporary
}
// PacketTooBig returns in response to sending a packet that is too large, and
// if so, the maximum supported packet size that should be used for the
// connection.
func (e *ConnError) PacketTooBig() bool {
return e.maxsize > 0
}
// PacketMaximumSize returns the maximum supported packet size. This will only
// return a non-zero value if ConnError.PacketTooBig() returns true.
func (e *ConnError) PacketMaximumSize() int {
if !e.PacketTooBig() {
return 0
}
return e.maxsize
}
// Closed returns if the session is already closed and is now unusable.
func (e *ConnError) Closed() bool {
return e.closed
}
// The Conn struct is a reference to an active connection session between the
// local node and a remote node. Conn implements the io.ReadWriteCloser
// interface and is used to send and receive traffic with a remote node.
type Conn struct {
phony.Inbox
core *Core
readDeadline *time.Time
writeDeadline *time.Time
nodeID *crypto.NodeID
nodeMask *crypto.NodeID
session *sessionInfo
mtu MTU
readCallback func([]byte)
readBuffer chan []byte
}
// TODO func NewConn() that initializes additional fields as needed
func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn {
conn := Conn{
core: core,
nodeID: nodeID,
nodeMask: nodeMask,
session: session,
readBuffer: make(chan []byte, 1024),
}
return &conn
}
// String returns a string that uniquely identifies a connection. Currently this
// takes a form similar to "conn=0x0000000", which contains a memory reference
// to the Conn object. While this value should always be unique for each Conn
// object, the format of this is not strictly defined and may change in the
// future.
func (c *Conn) String() string {
var s string
phony.Block(c, func() { s = fmt.Sprintf("conn=%p", c) })
return s
}
func (c *Conn) setMTU(from phony.Actor, mtu MTU) {
c.Act(from, func() { c.mtu = mtu })
}
// This should never be called from an actor, used in the dial functions
func (c *Conn) search() error {
var err error
done := make(chan struct{})
phony.Block(&c.core.router, func() {
_, isIn := c.core.router.searches.searches[*c.nodeID]
if !isIn {
searchCompleted := func(sinfo *sessionInfo, e error) {
select {
case <-done:
// Somehow this was called multiple times, TODO don't let that happen
if sinfo != nil {
// Need to clean up to avoid a session leak
sinfo.cancel.Cancel(nil)
sinfo.sessions.removeSession(sinfo)
}
default:
if sinfo != nil {
// Finish initializing the session
c.session = sinfo
c.session.setConn(nil, c)
c.nodeID = crypto.GetNodeID(&c.session.theirPermPub)
for i := range c.nodeMask {
c.nodeMask[i] = 0xFF
}
}
err = e
close(done)
}
}
sinfo := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
sinfo.startSearch()
} else {
err = errors.New("search already exists")
close(done)
}
})
<-done
if c.session == nil && err == nil {
panic("search failed but returned no error")
}
return err
}
// Used in session keep-alive traffic
func (c *Conn) _doSearch() {
s := fmt.Sprintf("conn=%p", c)
routerWork := func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.router.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", s, sinfo)
// Start the search
sinfo.startSearch()
}
}
c.core.router.Act(c.session, routerWork)
}
func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) {
if t != nil {
// A deadline is set, so return a Cancellation that uses it
c := util.CancellationWithDeadline(c.session.cancel, *t)
return c, true
} else {
// No deadline was set, so just return the existing cancellation and a dummy value
return c.session.cancel, false
}
}
// SetReadCallback allows you to specify a function that will be called whenever
// a packet is received. This should be used if you wish to implement
// asynchronous patterns for receiving data from the remote node.
//
// Note that if a read callback has been supplied, you should no longer attempt
// to use the synchronous Read function.
func (c *Conn) SetReadCallback(callback func([]byte)) {
c.Act(nil, func() {
c.readCallback = callback
c._drainReadBuffer()
})
}
func (c *Conn) _drainReadBuffer() {
if c.readCallback == nil {
return
}
select {
case bs := <-c.readBuffer:
c.readCallback(bs)
c.Act(nil, c._drainReadBuffer) // In case there's more
default:
}
}
// Called by the session to pass a new message to the Conn
func (c *Conn) recvMsg(from phony.Actor, msg []byte) {
c.Act(from, func() {
if c.readCallback != nil {
c.readCallback(msg)
} else {
select {
case c.readBuffer <- msg:
default:
}
}
})
}
// Used internally by Read, the caller is responsible for util.PutBytes when they're done.
func (c *Conn) readNoCopy() ([]byte, error) {
var cancel util.Cancellation
var doCancel bool
phony.Block(c, func() { cancel, doCancel = c._getDeadlineCancellation(c.readDeadline) })
if doCancel {
defer cancel.Cancel(nil)
}
// Wait for some traffic to come through from the session
select {
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
return nil, ConnError{errors.New("read timeout"), true, false, false, 0}
} else {
return nil, ConnError{errors.New("session closed"), false, false, true, 0}
}
case bs := <-c.readBuffer:
return bs, nil
}
}
// Read allows you to read from the connection in a synchronous fashion. The
// function will block up until the point that either new data is available, the
// connection has been closed or the read deadline has been reached. If the
// function succeeds, the number of bytes read from the connection will be
// returned. Otherwise, an error condition will be returned.
//
// Note that you can also implement asynchronous reads by using SetReadCallback.
// If you do that, you should no longer attempt to use the Read function.
func (c *Conn) Read(b []byte) (int, error) {
bs, err := c.readNoCopy()
if err != nil {
return 0, err
}
n := len(bs)
if len(bs) > len(b) {
n = len(b)
err = ConnError{errors.New("read buffer too small for entire packet"), false, true, false, 0}
}
// Copy results to the output slice and clean up
copy(b, bs)
// Return the number of bytes copied to the slice, along with any error
return n, err
}
func (c *Conn) _write(msg FlowKeyMessage) error {
if len(msg.Message) > int(c.mtu) {
return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)}
}
c.session.Act(c, func() {
// Send the packet
c.session._send(msg)
// Session keep-alive, while we wait for the crypto workers from send
switch {
case time.Since(c.session.time) > 6*time.Second:
if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct
c._doSearch()
} else {
c.session.ping(c.session) // TODO send from self if this becomes an actor
}
case c.session.reset && c.session.pingTime.Before(c.session.time):
c.session.ping(c.session) // TODO send from self if this becomes an actor
default: // Don't do anything, to keep traffic throttled
}
})
return nil
}
// WriteFrom should be called by a phony.Actor, and tells the Conn to send a
// message. This is used internally by Write. If the callback is called with a
// non-nil value, then it is safe to reuse the argument FlowKeyMessage.
func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(error)) {
c.Act(from, func() {
callback(c._write(msg))
})
}
// writeNoCopy is used internally by Write and makes use of WriteFrom under the hood.
// The caller must not reuse the argument FlowKeyMessage when a nil error is returned.
func (c *Conn) writeNoCopy(msg FlowKeyMessage) error {
var cancel util.Cancellation
var doCancel bool
phony.Block(c, func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) })
if doCancel {
defer cancel.Cancel(nil)
}
var err error
select {
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
err = ConnError{errors.New("write timeout"), true, false, false, 0}
} else {
err = ConnError{errors.New("session closed"), false, false, true, 0}
}
default:
done := make(chan struct{})
callback := func(e error) { err = e; close(done) }
c.WriteFrom(nil, msg, callback)
<-done
}
return err
}
// Write allows you to write to the connection in a synchronous fashion. This
// function may block until either the write has completed, the connection has
// been closed or the write deadline has been reached. If the function succeeds,
// the number of written bytes is returned. Otherwise, an error condition is
// returned.
func (c *Conn) Write(b []byte) (int, error) {
written := len(b)
bs := make([]byte, 0, len(b)+crypto.BoxOverhead)
bs = append(bs, b...)
msg := FlowKeyMessage{Message: bs}
err := c.writeNoCopy(msg)
if err != nil {
written = 0
}
return written, err
}
// Close will close an open connection and any blocking operations on the
// connection will unblock and return. From this point forward, the connection
// can no longer be used and you should no longer attempt to Read or Write to
// the connection.
func (c *Conn) Close() (err error) {
phony.Block(c, func() {
if c.session != nil {
// Close the session, if it hasn't been closed already
if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil {
err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
} else {
c.session.doRemove()
}
}
})
return
}
// LocalAddr returns the complete public key of the local side of the
// connection. This is always going to return your own node's public key.
func (c *Conn) LocalAddr() net.Addr {
return &c.core.boxPub
}
// RemoteAddr returns the complete public key of the remote side of the
// connection.
func (c *Conn) RemoteAddr() net.Addr {
if c.session != nil {
return &c.session.theirPermPub
}
return nil
}
// SetDeadline is equivalent to calling both SetReadDeadline and
// SetWriteDeadline with the same value, configuring the maximum amount of time
// that synchronous Read and Write operations can block for. If no deadline is
// configured, Read and Write operations can potentially block indefinitely.
func (c *Conn) SetDeadline(t time.Time) error {
c.SetReadDeadline(t)
c.SetWriteDeadline(t)
return nil
}
// SetReadDeadline configures the maximum amount of time that a synchronous Read
// operation can block for. A Read operation will unblock at the point that the
// read deadline is reached if no other condition (such as data arrival or
// connection closure) happens first. If no deadline is configured, Read
// operations can potentially block indefinitely.
func (c *Conn) SetReadDeadline(t time.Time) error {
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
phony.Block(c, func() { c.readDeadline = &t })
return nil
}
// SetWriteDeadline configures the maximum amount of time that a synchronous
// Write operation can block for. A Write operation will unblock at the point
// that the read deadline is reached if no other condition (such as data sending
// or connection closure) happens first. If no deadline is configured, Write
// operations can potentially block indefinitely.
func (c *Conn) SetWriteDeadline(t time.Time) error {
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
phony.Block(c, func() { c.writeDeadline = &t })
return nil
}

View file

@ -1,120 +0,0 @@
package yggdrasil
import (
"context"
"encoding/hex"
"errors"
"net"
"strconv"
"strings"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
)
// Dialer represents an Yggdrasil connection dialer.
type Dialer struct {
core *Core
}
// Dial opens a session to the given node. The first parameter should be
// "curve25519" or "nodeid" and the second parameter should contain a
// hexadecimal representation of the target. It uses DialContext internally.
func (d *Dialer) Dial(network, address string) (net.Conn, error) {
return d.DialContext(nil, network, address)
}
// DialContext is used internally by Dial, and should only be used with a
// context that includes a timeout. It uses DialByNodeIDandMask internally when
// the network is "nodeid", or DialByPublicKey when the network is "curve25519".
func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
var nodeID crypto.NodeID
var nodeMask crypto.NodeID
// Process
switch network {
case "curve25519":
dest, err := hex.DecodeString(address)
if err != nil {
return nil, err
}
if len(dest) != crypto.BoxPubKeyLen {
return nil, errors.New("invalid key length supplied")
}
var pubKey crypto.BoxPubKey
copy(pubKey[:], dest)
return d.DialByPublicKey(ctx, &pubKey)
case "nodeid":
// A node ID was provided - we don't need to do anything special with it
if tokens := strings.Split(address, "/"); len(tokens) == 2 {
l, err := strconv.Atoi(tokens[1])
if err != nil {
return nil, err
}
dest, err := hex.DecodeString(tokens[0])
if err != nil {
return nil, err
}
copy(nodeID[:], dest)
for idx := 0; idx < l; idx++ {
nodeMask[idx/8] |= 0x80 >> byte(idx%8)
}
} else {
dest, err := hex.DecodeString(tokens[0])
if err != nil {
return nil, err
}
copy(nodeID[:], dest)
for i := range nodeMask {
nodeMask[i] = 0xFF
}
}
return d.DialByNodeIDandMask(ctx, &nodeID, &nodeMask)
default:
// An unexpected address type was given, so give up
return nil, errors.New("unexpected address type")
}
}
// DialByNodeIDandMask opens a session to the given node based on raw NodeID
// parameters. If ctx is nil or has no timeout, then a default timeout of 6
// seconds will apply, beginning *after* the search finishes.
func (d *Dialer) DialByNodeIDandMask(ctx context.Context, nodeID, nodeMask *crypto.NodeID) (net.Conn, error) {
startDial := time.Now()
conn := newConn(d.core, nodeID, nodeMask, nil)
if err := conn.search(); err != nil {
// TODO: make searches take a context, so they can be cancelled early
conn.Close()
return nil, err
}
endSearch := time.Now()
d.core.log.Debugln("Dial searched for:", nodeID, "in time:", endSearch.Sub(startDial))
conn.session.setConn(nil, conn)
var cancel context.CancelFunc
if ctx == nil {
ctx = context.Background()
}
ctx, cancel = context.WithTimeout(ctx, 6*time.Second)
defer cancel()
select {
case <-conn.session.init:
endInit := time.Now()
d.core.log.Debugln("Dial initialized session for:", nodeID, "in time:", endInit.Sub(endSearch))
d.core.log.Debugln("Finished dial for:", nodeID, "in time:", endInit.Sub(startDial))
return conn, nil
case <-ctx.Done():
conn.Close()
return nil, errors.New("session handshake timeout")
}
}
// DialByPublicKey opens a session to the given node based on the public key. If
// ctx is nil or has no timeout, then a default timeout of 6 seconds will apply,
// beginning *after* the search finishes.
func (d *Dialer) DialByPublicKey(ctx context.Context, pubKey *crypto.BoxPubKey) (net.Conn, error) {
nodeID := crypto.GetNodeID(pubKey)
var nodeMask crypto.NodeID
for i := range nodeMask {
nodeMask[i] = 0xFF
}
return d.DialByNodeIDandMask(ctx, nodeID, &nodeMask)
}

View file

@ -1,45 +0,0 @@
package yggdrasil
import (
"errors"
"net"
)
// Listener waits for incoming sessions
type Listener struct {
core *Core
conn chan *Conn
close chan interface{}
}
// Accept blocks until a new incoming session is received
func (l *Listener) Accept() (net.Conn, error) {
select {
case c, ok := <-l.conn:
if !ok {
return nil, errors.New("listener closed")
}
return c, nil
case <-l.close:
return nil, errors.New("listener closed")
}
}
// Close will stop the listener
func (l *Listener) Close() (err error) {
defer func() {
recover()
err = errors.New("already closed")
}()
if l.core.router.sessions.listener == l {
l.core.router.sessions.listener = nil
}
close(l.close)
close(l.conn)
return nil
}
// Addr returns the address of the listener
func (l *Listener) Addr() net.Addr {
return &l.core.boxPub
}

175
src/yggdrasil/packetconn.go Normal file
View file

@ -0,0 +1,175 @@
package yggdrasil
import (
"errors"
"net"
"time"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/types"
)
type packet struct {
addr *crypto.BoxPubKey
payload []byte
}
type PacketConn struct {
phony.Inbox
net.PacketConn
closed bool
readBuffer chan packet
sessions *sessions
}
func newPacketConn(ss *sessions) *PacketConn {
return &PacketConn{
sessions: ss,
readBuffer: make(chan packet),
}
}
// implements net.PacketConn
func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
if c.closed {
return 0, nil, PacketConnError{closed: true}
}
packet := <-c.readBuffer
copy(b, packet.payload)
return len(packet.payload), packet.addr, nil
}
// implements net.PacketConn
func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
if c.closed {
return 0, PacketConnError{closed: true}
}
boxPubKey, ok := addr.(*crypto.BoxPubKey)
if !ok {
return 0, errors.New("expected crypto.BoxPubKey as net.Addr")
}
session, ok := c.sessions.getByTheirPerm(boxPubKey)
if !ok {
return 0, errors.New("expected a session but there was none")
}
err := make(chan error, 1)
msg := FlowKeyMessage{Message: b}
nodeID := crypto.GetNodeID(boxPubKey)
nodeMask := &crypto.NodeID{}
for i := range nodeMask {
nodeMask[i] = 0xFF
}
session.Act(c, func() {
// Check if the packet is small enough to go through this session
sessionMTU := session._getMTU()
if types.MTU(len(b)) > sessionMTU {
err <- PacketConnError{maxsize: int(sessionMTU)}
return
}
// Send the packet
session._send(msg)
// Session keep-alive, while we wait for the crypto workers from send
switch {
case time.Since(session.time) > 6*time.Second:
if session.time.Before(session.pingTime) && time.Since(session.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct
c.sessions.router.Act(session, func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.sessions.router.searches.searches[*nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.sessions.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted)
c.sessions.router.core.log.Debugf("DHT search started: %p", sinfo)
// Start the search
sinfo.startSearch()
}
})
} else {
session.ping(session) // TODO send from self if this becomes an actor
}
case session.reset && session.pingTime.Before(session.time):
session.ping(session) // TODO send from self if this becomes an actor
default: // Don't do anything, to keep traffic throttled
}
err <- nil
})
e := <-err
return len(b), e
}
// implements net.PacketConn
func (c *PacketConn) Close() error {
return nil
}
// implements net.PacketConn
func (c *PacketConn) LocalAddr() net.Addr {
return &c.sessions.router.core.boxPub
}
// implements net.PacketConn
func (c *PacketConn) SetDeadline(t time.Time) error {
return nil
}
// implements net.PacketConn
func (c *PacketConn) SetReadDeadline(t time.Time) error {
return nil
}
// implements net.PacketConn
func (c *PacketConn) SetWriteDeadline(t time.Time) error {
return nil
}
// PacketConnError implements the net.Error interface
type PacketConnError struct {
error
timeout bool
temporary bool
closed bool
maxsize int
}
// Timeout returns true if the error relates to a timeout condition on the
// connection.
func (e *PacketConnError) Timeout() bool {
return e.timeout
}
// Temporary return true if the error is temporary or false if it is a permanent
// error condition.
func (e *PacketConnError) Temporary() bool {
return e.temporary
}
// PacketTooBig returns in response to sending a packet that is too large, and
// if so, the maximum supported packet size that should be used for the
// connection.
func (e *PacketConnError) PacketTooBig() bool {
return e.maxsize > 0
}
// PacketMaximumSize returns the maximum supported packet size. This will only
// return a non-zero value if ConnError.PacketTooBig() returns true.
func (e *PacketConnError) PacketMaximumSize() int {
if !e.PacketTooBig() {
return 0
}
return e.maxsize
}
// Closed returns if the session is already closed and is now unusable.
func (e *PacketConnError) Closed() bool {
return e.closed
}

View file

@ -11,6 +11,7 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/types"
"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
@ -33,8 +34,8 @@ type sessionInfo struct {
myHandle crypto.Handle // myHandle crypto.Handle //
theirNonce crypto.BoxNonce // theirNonce crypto.BoxNonce //
myNonce crypto.BoxNonce // myNonce crypto.BoxNonce //
theirMTU MTU // theirMTU types.MTU //
myMTU MTU // myMTU types.MTU //
wasMTUFixed bool // Was the MTU fixed by a receive error? wasMTUFixed bool // Was the MTU fixed by a receive error?
timeOpened time.Time // Time the session was opened timeOpened time.Time // Time the session was opened
time time.Time // Time we last received a packet time time.Time // Time we last received a packet
@ -47,7 +48,7 @@ type sessionInfo struct {
bytesRecvd uint64 // Bytes of real traffic received in this session bytesRecvd uint64 // Bytes of real traffic received in this session
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
cancel util.Cancellation // Used to terminate workers cancel util.Cancellation // Used to terminate workers
conn *Conn // The associated Conn object conn *PacketConn // The associated PacketConn object
callbacks []chan func() // Finished work from crypto workers callbacks []chan func() // Finished work from crypto workers
table *lookupTable // table.self is a locator where we get our coords table *lookupTable // table.self is a locator where we get our coords
} }
@ -60,7 +61,7 @@ type sessionPing struct {
Coords []byte // Coords []byte //
Tstamp int64 // unix time, but the only real requirement is that it increases Tstamp int64 // unix time, but the only real requirement is that it increases
IsPong bool // IsPong bool //
MTU MTU // MTU types.MTU //
} }
// Updates session info in response to a ping, after checking that the ping is OK. // Updates session info in response to a ping, after checking that the ping is OK.
@ -83,9 +84,6 @@ func (s *sessionInfo) _update(p *sessionPing) bool {
} }
if p.MTU >= 1280 || p.MTU == 0 { if p.MTU >= 1280 || p.MTU == 0 {
s.theirMTU = p.MTU s.theirMTU = p.MTU
if s.conn != nil {
s.conn.setMTU(s, s._getMTU())
}
} }
if !bytes.Equal(s.coords, p.Coords) { if !bytes.Equal(s.coords, p.Coords) {
// allocate enough space for additional coords // allocate enough space for additional coords
@ -109,12 +107,11 @@ func (s *sessionInfo) _update(p *sessionPing) bool {
// Additionally, stores maps of address/subnet onto keys, and keys onto handles. // Additionally, stores maps of address/subnet onto keys, and keys onto handles.
type sessions struct { type sessions struct {
router *router router *router
listener *Listener packetConn *PacketConn
listenerMutex sync.Mutex
lastCleanup time.Time lastCleanup time.Time
isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed
isAllowedMutex sync.RWMutex // Protects the above isAllowedMutex sync.RWMutex // Protects the above
myMaximumMTU MTU // Maximum allowed session MTU myMaximumMTU types.MTU // Maximum allowed session MTU
permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot
sinfos map[crypto.Handle]*sessionInfo // Maps handle onto session info sinfos map[crypto.Handle]*sessionInfo // Maps handle onto session info
byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle
@ -123,6 +120,7 @@ type sessions struct {
// Initializes the session struct. // Initializes the session struct.
func (ss *sessions) init(r *router) { func (ss *sessions) init(r *router) {
ss.router = r ss.router = r
ss.packetConn = newPacketConn(ss)
ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey)
ss.sinfos = make(map[crypto.Handle]*sessionInfo) ss.sinfos = make(map[crypto.Handle]*sessionInfo)
ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle) ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle)
@ -330,13 +328,6 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) {
} }
} }
func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) {
sinfo.Act(from, func() {
sinfo.conn = conn
sinfo.conn.setMTU(sinfo, sinfo._getMTU())
})
}
// Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. // Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful.
// If the session has a packet cached (common when first setting up a session), it will be sent. // If the session has a packet cached (common when first setting up a session), it will be sent.
func (ss *sessions) handlePing(ping *sessionPing) { func (ss *sessions) handlePing(ping *sessionPing) {
@ -347,23 +338,12 @@ func (ss *sessions) handlePing(ping *sessionPing) {
case isIn: // Session already exists case isIn: // Session already exists
case !ss.isSessionAllowed(&ping.SendPermPub, false): // Session is not allowed case !ss.isSessionAllowed(&ping.SendPermPub, false): // Session is not allowed
default: default:
ss.listenerMutex.Lock() // This is a ping from an allowed node for which no session exists, and we have a listener ready to handle sessions.
if ss.listener != nil { // We need to create a session and pass it to the listener.
// This is a ping from an allowed node for which no session exists, and we have a listener ready to handle sessions. sinfo = ss.createSession(&ping.SendPermPub)
// We need to create a session and pass it to the listener. if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo {
sinfo = ss.createSession(&ping.SendPermPub) panic("This should not happen")
if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo {
panic("This should not happen")
}
conn := newConn(ss.router.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo)
for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF
}
sinfo.setConn(ss.router, conn)
c := ss.listener.conn
go func() { c <- conn }()
} }
ss.listenerMutex.Unlock()
} }
if sinfo != nil { if sinfo != nil {
sinfo.Act(ss.router, func() { sinfo.Act(ss.router, func() {
@ -381,7 +361,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
// Get the MTU of the session. // Get the MTU of the session.
// Will be equal to the smaller of this node's MTU or the remote node's MTU. // Will be equal to the smaller of this node's MTU or the remote node's MTU.
// If sending over links with a maximum message size (this was a thing with the old UDP code), it could be further lowered, to a minimum of 1280. // If sending over links with a maximum message size (this was a thing with the old UDP code), it could be further lowered, to a minimum of 1280.
func (sinfo *sessionInfo) _getMTU() MTU { func (sinfo *sessionInfo) _getMTU() types.MTU {
if sinfo.theirMTU == 0 || sinfo.myMTU == 0 { if sinfo.theirMTU == 0 || sinfo.myMTU == 0 {
return 0 return 0
} }
@ -467,7 +447,10 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
} }
sinfo._updateNonce(&p.Nonce) sinfo._updateNonce(&p.Nonce)
sinfo.bytesRecvd += uint64(len(bs)) sinfo.bytesRecvd += uint64(len(bs))
sinfo.conn.recvMsg(sinfo, bs) sinfo.sessions.packetConn.readBuffer <- packet{
addr: &sinfo.theirPermPub,
payload: bs,
}
} }
ch <- callback ch <- callback
sinfo.checkCallbacks() sinfo.checkCallbacks()

View file

@ -9,6 +9,7 @@ package yggdrasil
import ( import (
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/types"
) )
const ( const (
@ -382,7 +383,7 @@ func (p *sessionPing) decode(bs []byte) bool {
if pType == wire_SessionPong { if pType == wire_SessionPong {
p.IsPong = true p.IsPong = true
} }
p.MTU = MTU(mtu) p.MTU = types.MTU(mtu)
return true return true
} }