From aed13e6d501461c3464057dda8efb151074837ac Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 1 Jul 2020 19:18:50 +0100 Subject: [PATCH] Initial work on net.PacketConn --- cmd/yggdrasil/main.go | 16 +- src/tuntap/conn.go | 227 -------------------- src/tuntap/iface.go | 93 ++++----- src/tuntap/tun.go | 121 ++++------- src/yggdrasil/api.go | 75 ++++--- src/yggdrasil/conn.go | 399 ------------------------------------ src/yggdrasil/dialer.go | 120 ----------- src/yggdrasil/listener.go | 45 ---- src/yggdrasil/packetconn.go | 175 ++++++++++++++++ src/yggdrasil/session.go | 53 ++--- src/yggdrasil/wire.go | 3 +- 11 files changed, 321 insertions(+), 1006 deletions(-) delete mode 100644 src/tuntap/conn.go delete mode 100644 src/yggdrasil/conn.go delete mode 100644 src/yggdrasil/dialer.go delete mode 100644 src/yggdrasil/listener.go create mode 100644 src/yggdrasil/packetconn.go diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 813e950b..40915d39 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -297,19 +297,11 @@ func main() { } n.multicast.SetupAdminHandlers(n.admin.(*admin.AdminSocket)) // Start the TUN/TAP interface - if listener, err := n.core.ConnListen(); err == nil { - if dialer, err := n.core.ConnDialer(); err == nil { - n.tuntap.Init(&n.core, n.state, logger, tuntap.TunOptions{Listener: listener, Dialer: dialer}) - if err := n.tuntap.Start(); err != nil { - logger.Errorln("An error occurred starting TUN/TAP:", err) - } - n.tuntap.SetupAdminHandlers(n.admin.(*admin.AdminSocket)) - } else { - logger.Errorln("Unable to get Dialer:", err) - } - } else { - logger.Errorln("Unable to get Listener:", err) + n.tuntap.Init(&n.core, n.state, logger, tuntap.TunOptions{PacketConn: n.core.PacketConn()}) + if err := n.tuntap.Start(); err != nil { + logger.Errorln("An error occurred starting TUN/TAP:", err) } + n.tuntap.SetupAdminHandlers(n.admin.(*admin.AdminSocket)) // Make some nice output that tells us what our IPv6 address and subnet are. // This is just logged to stdout for the user. address := n.core.Address() diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go deleted file mode 100644 index ddd89e9b..00000000 --- a/src/tuntap/conn.go +++ /dev/null @@ -1,227 +0,0 @@ -package tuntap - -import ( - "bytes" - "errors" - "time" - - "github.com/Arceliar/phony" - "github.com/yggdrasil-network/yggdrasil-go/src/address" - "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" - "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" - "golang.org/x/net/icmp" - "golang.org/x/net/ipv6" -) - -const tunConnTimeout = 2 * time.Minute - -type tunConn struct { - phony.Inbox - tun *TunAdapter - conn *yggdrasil.Conn - addr address.Address - snet address.Subnet - stop chan struct{} - alive *time.Timer // From calling time.AfterFunc -} - -func (s *tunConn) close() { - s.tun.Act(s, s._close_from_tun) -} - -func (s *tunConn) _close_from_tun() { - go s.conn.Close() // Just in case it blocks on actor operations - delete(s.tun.addrToConn, s.addr) - delete(s.tun.subnetToConn, s.snet) - func() { - defer func() { recover() }() - close(s.stop) // Closes reader/writer goroutines - }() -} - -func (s *tunConn) _read(bs []byte) (err error) { - select { - case <-s.stop: - err = errors.New("session was already closed") - return - default: - } - if len(bs) == 0 { - err = errors.New("read packet with 0 size") - return - } - ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40 - ipv6 := len(bs) > 40 && bs[0]&0xf0 == 0x60 - isCGA := true - // Check source addresses - switch { - case ipv6 && bs[8] == 0x02 && bytes.Equal(s.addr[:16], bs[8:24]): // source - case ipv6 && bs[8] == 0x03 && bytes.Equal(s.snet[:8], bs[8:16]): // source - default: - isCGA = false - } - // Check destination addresses - switch { - case ipv6 && bs[24] == 0x02 && bytes.Equal(s.tun.addr[:16], bs[24:40]): // destination - case ipv6 && bs[24] == 0x03 && bytes.Equal(s.tun.subnet[:8], bs[24:32]): // destination - default: - isCGA = false - } - // Decide how to handle the packet - var skip bool - switch { - case isCGA: // Allowed - case s.tun.ckr.isEnabled() && (ipv4 || ipv6): - var srcAddr address.Address - var dstAddr address.Address - var addrlen int - if ipv4 { - copy(srcAddr[:], bs[12:16]) - copy(dstAddr[:], bs[16:20]) - addrlen = 4 - } - if ipv6 { - copy(srcAddr[:], bs[8:24]) - copy(dstAddr[:], bs[24:40]) - addrlen = 16 - } - if !s.tun.ckr.isValidLocalAddress(dstAddr, addrlen) { - // The destination address isn't in our CKR allowed range - skip = true - } else if key, err := s.tun.ckr.getPublicKeyForAddress(srcAddr, addrlen); err == nil { - if *s.conn.RemoteAddr().(*crypto.BoxPubKey) == key { - // This is the one allowed CKR case, where source and destination addresses are both good - } else { - // The CKR key associated with this address doesn't match the sender's NodeID - skip = true - } - } else { - // We have no CKR route for this source address - skip = true - } - default: - skip = true - } - if skip { - err = errors.New("address not allowed") - return - } - s.tun.writer.writeFrom(s, bs) - s.stillAlive() - return -} - -func (s *tunConn) writeFrom(from phony.Actor, bs []byte) { - s.Act(from, func() { - s._write(bs) - }) -} - -func (s *tunConn) _write(bs []byte) (err error) { - select { - case <-s.stop: - err = errors.New("session was already closed") - return - default: - } - v4 := len(bs) > 20 && bs[0]&0xf0 == 0x40 - v6 := len(bs) > 40 && bs[0]&0xf0 == 0x60 - isCGA := true - // Check source addresses - switch { - case v6 && bs[8] == 0x02 && bytes.Equal(s.tun.addr[:16], bs[8:24]): // source - case v6 && bs[8] == 0x03 && bytes.Equal(s.tun.subnet[:8], bs[8:16]): // source - default: - isCGA = false - } - // Check destiantion addresses - switch { - case v6 && bs[24] == 0x02 && bytes.Equal(s.addr[:16], bs[24:40]): // destination - case v6 && bs[24] == 0x03 && bytes.Equal(s.snet[:8], bs[24:32]): // destination - default: - isCGA = false - } - // Decide how to handle the packet - var skip bool - switch { - case isCGA: // Allowed - case s.tun.ckr.isEnabled() && (v4 || v6): - var srcAddr address.Address - var dstAddr address.Address - var addrlen int - if v4 { - copy(srcAddr[:], bs[12:16]) - copy(dstAddr[:], bs[16:20]) - addrlen = 4 - } - if v6 { - copy(srcAddr[:], bs[8:24]) - copy(dstAddr[:], bs[24:40]) - addrlen = 16 - } - if !s.tun.ckr.isValidLocalAddress(srcAddr, addrlen) { - // The source address isn't in our CKR allowed range - skip = true - } else if key, err := s.tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil { - if *s.conn.RemoteAddr().(*crypto.BoxPubKey) == key { - // This is the one allowed CKR case, where source and destination addresses are both good - } else { - // The CKR key associated with this address doesn't match the sender's NodeID - skip = true - } - } else { - // We have no CKR route for this destination address... why do we have the packet in the first place? - skip = true - } - default: - skip = true - } - if skip { - err = errors.New("address not allowed") - return - } - msg := yggdrasil.FlowKeyMessage{ - FlowKey: util.GetFlowKey(bs), - Message: bs, - } - s.conn.WriteFrom(s, msg, func(err error) { - if err == nil { - // No point in wasting resources to send back an error if there was none - return - } - s.Act(s.conn, func() { - if e, eok := err.(yggdrasil.ConnError); !eok { - if e.Closed() { - s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err) - } else { - s.tun.log.Errorln(s.conn.String(), "TUN/TAP generic write error:", err) - } - } else if e.PacketTooBig() { - // TODO: This currently isn't aware of IPv4 for CKR - ptb := &icmp.PacketTooBig{ - MTU: int(e.PacketMaximumSize()), - Data: bs[:900], - } - if packet, err := CreateICMPv6(bs[8:24], bs[24:40], ipv6.ICMPTypePacketTooBig, 0, ptb); err == nil { - s.tun.writer.writeFrom(s, packet) - } - } else { - if e.Closed() { - s.tun.log.Debugln(s.conn.String(), "TUN/TAP conn write debug:", err) - } else { - s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn write error:", err) - } - } - }) - }) - s.stillAlive() - return -} - -func (s *tunConn) stillAlive() { - if s.alive != nil { - s.alive.Stop() - } - s.alive = time.AfterFunc(tunConnTimeout, s.close) -} diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index 9250665a..38f22959 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -1,9 +1,11 @@ package tuntap import ( + "fmt" + "net" + "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/Arceliar/phony" ) @@ -149,60 +151,45 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) { // Couldn't find this node's ygg IP return } - // Do we have an active connection for this node address? - var dstString string - session, isIn := tun.addrToConn[dstAddr] - if !isIn || session == nil { - session, isIn = tun.subnetToConn[dstSnet] - if !isIn || session == nil { - // Neither an address nor a subnet mapping matched, therefore populate - // the node ID and mask to commence a search - if dstAddr.IsValid() { - dstString = dstAddr.GetNodeIDLengthString() - } else { - dstString = dstSnet.GetNodeIDLengthString() - } + + var boxPubKey *crypto.BoxPubKey + if key, ok := tun.addrToBoxPubKey[dstAddr]; ok { + boxPubKey = key + } else if key, ok := tun.subnetToBoxPubKey[dstSnet]; ok { + boxPubKey = key + } else { + var dstNodeID, dstNodeMask *crypto.NodeID + if dstAddr[0] == 0x02 { + dstNodeID, dstNodeMask = dstAddr.GetNodeIDandMask() + } else if dstAddr[0] == 0x03 { + dstNodeID, dstNodeMask = dstSnet.GetNodeIDandMask() } + if dstNodeID == nil || dstNodeMask == nil { + tun.log.Errorln("Didn't find node ID/mask") + return + } + + fmt.Println("Start search for", net.IP(dstAddr[:]).String()) + + _, boxPubKey, err = tun.core.Resolve(dstNodeID, dstNodeMask) + if err != nil { + tun.log.Errorln("tun.core.Resolve:", err) + return + } + tun.addrToBoxPubKey[dstAddr] = boxPubKey + tun.subnetToBoxPubKey[dstSnet] = boxPubKey } - // If we don't have a connection then we should open one - if !isIn || session == nil { - // Check we haven't been given empty node ID, really this shouldn't ever - // happen but just to be sure... - if dstString == "" { - panic("Given empty dstString - this shouldn't happen") - } - _, known := tun.dials[dstString] - tun.dials[dstString] = append(tun.dials[dstString], bs) - for len(tun.dials[dstString]) > 32 { - tun.dials[dstString] = tun.dials[dstString][1:] - } - if !known { - go func() { - conn, err := tun.dialer.Dial("nodeid", dstString) - tun.Act(nil, func() { - packets := tun.dials[dstString] - delete(tun.dials, dstString) - if err != nil { - return - } - // We've been given a connection so prepare the session wrapper - var tc *tunConn - if tc, err = tun._wrap(conn.(*yggdrasil.Conn)); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN iface wrap:", err) - return - } - for _, packet := range packets { - tc.writeFrom(nil, packet) - } - }) - return - }() - } + + if boxPubKey == nil { + tun.log.Errorln("No destination public key found for this packet") + return } - // If we have a connection now, try writing to it - if isIn && session != nil { - session.writeFrom(tun, bs) + + n, err = tun.packetConn.WriteTo(recvd, boxPubKey) + if err != nil { + tun.log.Errorln("tun.packetConn.WriteTo:", err) + } + if n != len(recvd) { + tun.log.Errorln("Expected to send", len(recvd), "bytes but sent", n, "bytes") } } diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 656ecca7..3d6be276 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -37,31 +37,28 @@ const tun_IPv6_HEADER_LENGTH = 40 // should pass this object to the yggdrasil.SetRouterAdapter() function before // calling yggdrasil.Start(). type TunAdapter struct { - core *yggdrasil.Core - writer tunWriter - reader tunReader - config *config.NodeState - log *log.Logger - reconfigure chan chan error - listener *yggdrasil.Listener - dialer *yggdrasil.Dialer - addr address.Address - subnet address.Subnet - ckr cryptokey - icmpv6 ICMPv6 - mtu MTU - iface tun.Device - phony.Inbox // Currently only used for _handlePacket from the reader, TODO: all the stuff that currently needs a mutex below - //mutex sync.RWMutex // Protects the below - addrToConn map[address.Address]*tunConn - subnetToConn map[address.Subnet]*tunConn - dials map[string][][]byte // Buffer of packets to send after dialing finishes - isOpen bool + core *yggdrasil.Core + writer tunWriter + reader tunReader + config *config.NodeState + log *log.Logger + reconfigure chan chan error + packetConn net.PacketConn + addr address.Address + subnet address.Subnet + addrToBoxPubKey map[address.Address]*crypto.BoxPubKey + subnetToBoxPubKey map[address.Subnet]*crypto.BoxPubKey + ckr cryptokey + icmpv6 ICMPv6 + mtu MTU + iface tun.Device + phony.Inbox // Currently only used for _handlePacket from the reader, TODO: all the stuff that currently needs a mutex below + dials map[string][][]byte // Buffer of packets to send after dialing finishes + isOpen bool } type TunOptions struct { - Listener *yggdrasil.Listener - Dialer *yggdrasil.Dialer + PacketConn net.PacketConn } // Gets the maximum supported MTU for the platform based on the defaults in @@ -120,10 +117,9 @@ func (tun *TunAdapter) Init(core *yggdrasil.Core, config *config.NodeState, log tun.core = core tun.config = config tun.log = log - tun.listener = tunoptions.Listener - tun.dialer = tunoptions.Dialer - tun.addrToConn = make(map[address.Address]*tunConn) - tun.subnetToConn = make(map[address.Subnet]*tunConn) + tun.packetConn = tunoptions.PacketConn + tun.addrToBoxPubKey = make(map[address.Address]*crypto.BoxPubKey) + tun.subnetToBoxPubKey = make(map[address.Subnet]*crypto.BoxPubKey) tun.dials = make(map[string][][]byte) tun.writer.tun = tun tun.reader.tun = tun @@ -145,7 +141,7 @@ func (tun *TunAdapter) _start() error { return errors.New("TUN module is already started") } current := tun.config.GetCurrent() - if tun.config == nil || tun.listener == nil || tun.dialer == nil { + if tun.config == nil { return errors.New("no configuration available to TUN") } var boxPub crypto.BoxPubKey @@ -170,9 +166,20 @@ func (tun *TunAdapter) _start() error { } tun.core.SetMaximumSessionMTU(tun.MTU()) tun.isOpen = true - go tun.handler() tun.reader.Act(nil, tun.reader._read) // Start the reader tun.ckr.init(tun) + go func() { + // TODO: put this somewhere more elegant + buf := make([]byte, 65535) + for { + n, _, err := tun.packetConn.ReadFrom(buf) + if err != nil { + log.Errorln("tun.packetConn.ReadFrom:", err) + continue + } + tun.writer.writeFrom(nil, buf[:n]) + } + }() return nil } @@ -223,61 +230,3 @@ func (tun *TunAdapter) UpdateConfig(config *config.NodeConfig) { // Notify children about the configuration change tun.Act(nil, tun.ckr.configure) } - -func (tun *TunAdapter) handler() error { - for { - // Accept the incoming connection - conn, err := tun.listener.Accept() - if err != nil { - tun.log.Errorln("TUN connection accept error:", err) - return err - } - phony.Block(tun, func() { - if _, err := tun._wrap(conn.(*yggdrasil.Conn)); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN handler wrap:", err) - } - }) - } -} - -func (tun *TunAdapter) _wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { - // Prepare a session wrapper for the given connection - s := tunConn{ - tun: tun, - conn: conn, - stop: make(chan struct{}), - } - c = &s - // Get the remote address and subnet of the other side - remotePubKey := conn.RemoteAddr().(*crypto.BoxPubKey) - remoteNodeID := crypto.GetNodeID(remotePubKey) - s.addr = *address.AddrForNodeID(remoteNodeID) - s.snet = *address.SubnetForNodeID(remoteNodeID) - // Work out if this is already a destination we already know about - atc, aok := tun.addrToConn[s.addr] - stc, sok := tun.subnetToConn[s.snet] - // If we know about a connection for this destination already then assume it - // is no longer valid and close it - if aok { - atc._close_from_tun() - err = errors.New("replaced connection for address") - } else if sok { - stc._close_from_tun() - err = errors.New("replaced connection for subnet") - } - // Save the session wrapper so that we can look it up quickly next time - // we receive a packet through the interface for this address - tun.addrToConn[s.addr] = &s - tun.subnetToConn[s.snet] = &s - // Set the read callback and start the timeout - conn.SetReadCallback(func(bs []byte) { - s.Act(conn, func() { - s._read(bs) - }) - }) - s.Act(nil, s.stillAlive) - // Return - return c, err -} diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index b5b8d362..c02c52c7 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -11,6 +11,7 @@ import ( "github.com/gologme/log" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/types" "github.com/Arceliar/phony" ) @@ -99,7 +100,7 @@ type Session struct { Coords []uint64 // The coordinates of the remote node BytesSent uint64 // Bytes sent to the session BytesRecvd uint64 // Bytes received from the session - MTU MTU // The maximum supported message size of the session + MTU types.MTU // The maximum supported message size of the session Uptime time.Duration // How long this session has been active for WasMTUFixed bool // This field is no longer used } @@ -225,32 +226,50 @@ func (c *Core) GetSessions() []Session { return sessions } -// ConnListen returns a listener for Yggdrasil session connections. You can only -// call this function once as each Yggdrasil node can only have a single -// ConnListener. Make sure to keep the reference to this for as long as it is -// needed. -func (c *Core) ConnListen() (*Listener, error) { - c.router.sessions.listenerMutex.Lock() - defer c.router.sessions.listenerMutex.Unlock() - if c.router.sessions.listener != nil { - return nil, errors.New("a listener already exists") - } - c.router.sessions.listener = &Listener{ - core: c, - conn: make(chan *Conn), - close: make(chan interface{}), - } - return c.router.sessions.listener, nil +// PacketConn returns a net.PacketConn which can be used to send and receive +// information over Yggdrasil sessions. +func (c *Core) PacketConn() net.PacketConn { + return c.router.sessions.packetConn } -// ConnDialer returns a dialer for Yggdrasil session connections. Since -// ConnDialers are stateless, you can request as many dialers as you like, -// although ideally you should request only one and keep the reference to it for -// as long as it is needed. -func (c *Core) ConnDialer() (*Dialer, error) { - return &Dialer{ - core: c, - }, nil +// Resolve takes a masked node ID and performs a search, returning the complete +// node ID and the node's public key. +func (c *Core) Resolve(nodeID, nodeMask *crypto.NodeID) (fullNodeID *crypto.NodeID, boxPubKey *crypto.BoxPubKey, err error) { + fmt.Println("**** START RESOLVE") + defer fmt.Println("**** END RESOLVE") + + done := make(chan struct{}) + c.router.Act(c, func() { + _, isIn := c.router.searches.searches[*nodeID] + if !isIn { + searchCompleted := func(sinfo *sessionInfo, e error) { + select { + case <-done: + // Somehow this was called multiple times, TODO don't let that happen + if sinfo != nil { + // Need to clean up to avoid a session leak + sinfo.cancel.Cancel(nil) + sinfo.sessions.removeSession(sinfo) + } + default: + if sinfo != nil { + fullNodeID = crypto.GetNodeID(&sinfo.theirPermPub) + boxPubKey = &sinfo.theirPermPub + c.router.sessions.createSession(&sinfo.theirPermPub) + } + err = e + close(done) + } + } + sinfo := c.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted) + sinfo.startSearch() + } else { + err = errors.New("search already exists") + close(done) + } + }) + <-done + return } // ListenTCP starts a new TCP listener. The input URI should match that of the @@ -344,8 +363,8 @@ func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) { } // GetMaximumSessionMTU returns the maximum allowed session MTU size. -func (c *Core) GetMaximumSessionMTU() MTU { - var mtu MTU +func (c *Core) GetMaximumSessionMTU() types.MTU { + var mtu types.MTU phony.Block(&c.router, func() { mtu = c.router.sessions.myMaximumMTU }) @@ -355,7 +374,7 @@ func (c *Core) GetMaximumSessionMTU() MTU { // SetMaximumSessionMTU sets the maximum allowed session MTU size. The default // value is 65535 bytes. Session pings will be sent to update all open sessions // if the MTU has changed. -func (c *Core) SetMaximumSessionMTU(mtu MTU) { +func (c *Core) SetMaximumSessionMTU(mtu types.MTU) { phony.Block(&c.router, func() { if c.router.sessions.myMaximumMTU != mtu { c.router.sessions.myMaximumMTU = mtu diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go deleted file mode 100644 index 47e26139..00000000 --- a/src/yggdrasil/conn.go +++ /dev/null @@ -1,399 +0,0 @@ -package yggdrasil - -import ( - "errors" - "fmt" - "net" - "time" - - "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/types" - "github.com/yggdrasil-network/yggdrasil-go/src/util" - - "github.com/Arceliar/phony" -) - -type MTU = types.MTU - -// ConnError implements the net.Error interface -type ConnError struct { - error - timeout bool - temporary bool - closed bool - maxsize int -} - -// Timeout returns true if the error relates to a timeout condition on the -// connection. -func (e *ConnError) Timeout() bool { - return e.timeout -} - -// Temporary return true if the error is temporary or false if it is a permanent -// error condition. -func (e *ConnError) Temporary() bool { - return e.temporary -} - -// PacketTooBig returns in response to sending a packet that is too large, and -// if so, the maximum supported packet size that should be used for the -// connection. -func (e *ConnError) PacketTooBig() bool { - return e.maxsize > 0 -} - -// PacketMaximumSize returns the maximum supported packet size. This will only -// return a non-zero value if ConnError.PacketTooBig() returns true. -func (e *ConnError) PacketMaximumSize() int { - if !e.PacketTooBig() { - return 0 - } - return e.maxsize -} - -// Closed returns if the session is already closed and is now unusable. -func (e *ConnError) Closed() bool { - return e.closed -} - -// The Conn struct is a reference to an active connection session between the -// local node and a remote node. Conn implements the io.ReadWriteCloser -// interface and is used to send and receive traffic with a remote node. -type Conn struct { - phony.Inbox - core *Core - readDeadline *time.Time - writeDeadline *time.Time - nodeID *crypto.NodeID - nodeMask *crypto.NodeID - session *sessionInfo - mtu MTU - readCallback func([]byte) - readBuffer chan []byte -} - -// TODO func NewConn() that initializes additional fields as needed -func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn { - conn := Conn{ - core: core, - nodeID: nodeID, - nodeMask: nodeMask, - session: session, - readBuffer: make(chan []byte, 1024), - } - return &conn -} - -// String returns a string that uniquely identifies a connection. Currently this -// takes a form similar to "conn=0x0000000", which contains a memory reference -// to the Conn object. While this value should always be unique for each Conn -// object, the format of this is not strictly defined and may change in the -// future. -func (c *Conn) String() string { - var s string - phony.Block(c, func() { s = fmt.Sprintf("conn=%p", c) }) - return s -} - -func (c *Conn) setMTU(from phony.Actor, mtu MTU) { - c.Act(from, func() { c.mtu = mtu }) -} - -// This should never be called from an actor, used in the dial functions -func (c *Conn) search() error { - var err error - done := make(chan struct{}) - phony.Block(&c.core.router, func() { - _, isIn := c.core.router.searches.searches[*c.nodeID] - if !isIn { - searchCompleted := func(sinfo *sessionInfo, e error) { - select { - case <-done: - // Somehow this was called multiple times, TODO don't let that happen - if sinfo != nil { - // Need to clean up to avoid a session leak - sinfo.cancel.Cancel(nil) - sinfo.sessions.removeSession(sinfo) - } - default: - if sinfo != nil { - // Finish initializing the session - c.session = sinfo - c.session.setConn(nil, c) - c.nodeID = crypto.GetNodeID(&c.session.theirPermPub) - for i := range c.nodeMask { - c.nodeMask[i] = 0xFF - } - } - err = e - close(done) - } - } - sinfo := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) - sinfo.startSearch() - } else { - err = errors.New("search already exists") - close(done) - } - }) - <-done - if c.session == nil && err == nil { - panic("search failed but returned no error") - } - return err -} - -// Used in session keep-alive traffic -func (c *Conn) _doSearch() { - s := fmt.Sprintf("conn=%p", c) - routerWork := func() { - // Check to see if there is a search already matching the destination - sinfo, isIn := c.core.router.searches.searches[*c.nodeID] - if !isIn { - // Nothing was found, so create a new search - searchCompleted := func(sinfo *sessionInfo, e error) {} - sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) - c.core.log.Debugf("%s DHT search started: %p", s, sinfo) - // Start the search - sinfo.startSearch() - } - } - c.core.router.Act(c.session, routerWork) -} - -func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) { - if t != nil { - // A deadline is set, so return a Cancellation that uses it - c := util.CancellationWithDeadline(c.session.cancel, *t) - return c, true - } else { - // No deadline was set, so just return the existing cancellation and a dummy value - return c.session.cancel, false - } -} - -// SetReadCallback allows you to specify a function that will be called whenever -// a packet is received. This should be used if you wish to implement -// asynchronous patterns for receiving data from the remote node. -// -// Note that if a read callback has been supplied, you should no longer attempt -// to use the synchronous Read function. -func (c *Conn) SetReadCallback(callback func([]byte)) { - c.Act(nil, func() { - c.readCallback = callback - c._drainReadBuffer() - }) -} - -func (c *Conn) _drainReadBuffer() { - if c.readCallback == nil { - return - } - select { - case bs := <-c.readBuffer: - c.readCallback(bs) - c.Act(nil, c._drainReadBuffer) // In case there's more - default: - } -} - -// Called by the session to pass a new message to the Conn -func (c *Conn) recvMsg(from phony.Actor, msg []byte) { - c.Act(from, func() { - if c.readCallback != nil { - c.readCallback(msg) - } else { - select { - case c.readBuffer <- msg: - default: - } - } - }) -} - -// Used internally by Read, the caller is responsible for util.PutBytes when they're done. -func (c *Conn) readNoCopy() ([]byte, error) { - var cancel util.Cancellation - var doCancel bool - phony.Block(c, func() { cancel, doCancel = c._getDeadlineCancellation(c.readDeadline) }) - if doCancel { - defer cancel.Cancel(nil) - } - // Wait for some traffic to come through from the session - select { - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - return nil, ConnError{errors.New("read timeout"), true, false, false, 0} - } else { - return nil, ConnError{errors.New("session closed"), false, false, true, 0} - } - case bs := <-c.readBuffer: - return bs, nil - } -} - -// Read allows you to read from the connection in a synchronous fashion. The -// function will block up until the point that either new data is available, the -// connection has been closed or the read deadline has been reached. If the -// function succeeds, the number of bytes read from the connection will be -// returned. Otherwise, an error condition will be returned. -// -// Note that you can also implement asynchronous reads by using SetReadCallback. -// If you do that, you should no longer attempt to use the Read function. -func (c *Conn) Read(b []byte) (int, error) { - bs, err := c.readNoCopy() - if err != nil { - return 0, err - } - n := len(bs) - if len(bs) > len(b) { - n = len(b) - err = ConnError{errors.New("read buffer too small for entire packet"), false, true, false, 0} - } - // Copy results to the output slice and clean up - copy(b, bs) - // Return the number of bytes copied to the slice, along with any error - return n, err -} - -func (c *Conn) _write(msg FlowKeyMessage) error { - if len(msg.Message) > int(c.mtu) { - return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)} - } - c.session.Act(c, func() { - // Send the packet - c.session._send(msg) - // Session keep-alive, while we wait for the crypto workers from send - switch { - case time.Since(c.session.time) > 6*time.Second: - if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second { - // TODO double check that the above condition is correct - c._doSearch() - } else { - c.session.ping(c.session) // TODO send from self if this becomes an actor - } - case c.session.reset && c.session.pingTime.Before(c.session.time): - c.session.ping(c.session) // TODO send from self if this becomes an actor - default: // Don't do anything, to keep traffic throttled - } - }) - return nil -} - -// WriteFrom should be called by a phony.Actor, and tells the Conn to send a -// message. This is used internally by Write. If the callback is called with a -// non-nil value, then it is safe to reuse the argument FlowKeyMessage. -func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(error)) { - c.Act(from, func() { - callback(c._write(msg)) - }) -} - -// writeNoCopy is used internally by Write and makes use of WriteFrom under the hood. -// The caller must not reuse the argument FlowKeyMessage when a nil error is returned. -func (c *Conn) writeNoCopy(msg FlowKeyMessage) error { - var cancel util.Cancellation - var doCancel bool - phony.Block(c, func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) }) - if doCancel { - defer cancel.Cancel(nil) - } - var err error - select { - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - err = ConnError{errors.New("write timeout"), true, false, false, 0} - } else { - err = ConnError{errors.New("session closed"), false, false, true, 0} - } - default: - done := make(chan struct{}) - callback := func(e error) { err = e; close(done) } - c.WriteFrom(nil, msg, callback) - <-done - } - return err -} - -// Write allows you to write to the connection in a synchronous fashion. This -// function may block until either the write has completed, the connection has -// been closed or the write deadline has been reached. If the function succeeds, -// the number of written bytes is returned. Otherwise, an error condition is -// returned. -func (c *Conn) Write(b []byte) (int, error) { - written := len(b) - bs := make([]byte, 0, len(b)+crypto.BoxOverhead) - bs = append(bs, b...) - msg := FlowKeyMessage{Message: bs} - err := c.writeNoCopy(msg) - if err != nil { - written = 0 - } - return written, err -} - -// Close will close an open connection and any blocking operations on the -// connection will unblock and return. From this point forward, the connection -// can no longer be used and you should no longer attempt to Read or Write to -// the connection. -func (c *Conn) Close() (err error) { - phony.Block(c, func() { - if c.session != nil { - // Close the session, if it hasn't been closed already - if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil { - err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} - } else { - c.session.doRemove() - } - } - }) - return -} - -// LocalAddr returns the complete public key of the local side of the -// connection. This is always going to return your own node's public key. -func (c *Conn) LocalAddr() net.Addr { - return &c.core.boxPub -} - -// RemoteAddr returns the complete public key of the remote side of the -// connection. -func (c *Conn) RemoteAddr() net.Addr { - if c.session != nil { - return &c.session.theirPermPub - } - return nil -} - -// SetDeadline is equivalent to calling both SetReadDeadline and -// SetWriteDeadline with the same value, configuring the maximum amount of time -// that synchronous Read and Write operations can block for. If no deadline is -// configured, Read and Write operations can potentially block indefinitely. -func (c *Conn) SetDeadline(t time.Time) error { - c.SetReadDeadline(t) - c.SetWriteDeadline(t) - return nil -} - -// SetReadDeadline configures the maximum amount of time that a synchronous Read -// operation can block for. A Read operation will unblock at the point that the -// read deadline is reached if no other condition (such as data arrival or -// connection closure) happens first. If no deadline is configured, Read -// operations can potentially block indefinitely. -func (c *Conn) SetReadDeadline(t time.Time) error { - // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... - phony.Block(c, func() { c.readDeadline = &t }) - return nil -} - -// SetWriteDeadline configures the maximum amount of time that a synchronous -// Write operation can block for. A Write operation will unblock at the point -// that the read deadline is reached if no other condition (such as data sending -// or connection closure) happens first. If no deadline is configured, Write -// operations can potentially block indefinitely. -func (c *Conn) SetWriteDeadline(t time.Time) error { - // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... - phony.Block(c, func() { c.writeDeadline = &t }) - return nil -} diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go deleted file mode 100644 index 9f58d305..00000000 --- a/src/yggdrasil/dialer.go +++ /dev/null @@ -1,120 +0,0 @@ -package yggdrasil - -import ( - "context" - "encoding/hex" - "errors" - "net" - "strconv" - "strings" - "time" - - "github.com/yggdrasil-network/yggdrasil-go/src/crypto" -) - -// Dialer represents an Yggdrasil connection dialer. -type Dialer struct { - core *Core -} - -// Dial opens a session to the given node. The first parameter should be -// "curve25519" or "nodeid" and the second parameter should contain a -// hexadecimal representation of the target. It uses DialContext internally. -func (d *Dialer) Dial(network, address string) (net.Conn, error) { - return d.DialContext(nil, network, address) -} - -// DialContext is used internally by Dial, and should only be used with a -// context that includes a timeout. It uses DialByNodeIDandMask internally when -// the network is "nodeid", or DialByPublicKey when the network is "curve25519". -func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { - var nodeID crypto.NodeID - var nodeMask crypto.NodeID - // Process - switch network { - case "curve25519": - dest, err := hex.DecodeString(address) - if err != nil { - return nil, err - } - if len(dest) != crypto.BoxPubKeyLen { - return nil, errors.New("invalid key length supplied") - } - var pubKey crypto.BoxPubKey - copy(pubKey[:], dest) - return d.DialByPublicKey(ctx, &pubKey) - case "nodeid": - // A node ID was provided - we don't need to do anything special with it - if tokens := strings.Split(address, "/"); len(tokens) == 2 { - l, err := strconv.Atoi(tokens[1]) - if err != nil { - return nil, err - } - dest, err := hex.DecodeString(tokens[0]) - if err != nil { - return nil, err - } - copy(nodeID[:], dest) - for idx := 0; idx < l; idx++ { - nodeMask[idx/8] |= 0x80 >> byte(idx%8) - } - } else { - dest, err := hex.DecodeString(tokens[0]) - if err != nil { - return nil, err - } - copy(nodeID[:], dest) - for i := range nodeMask { - nodeMask[i] = 0xFF - } - } - return d.DialByNodeIDandMask(ctx, &nodeID, &nodeMask) - default: - // An unexpected address type was given, so give up - return nil, errors.New("unexpected address type") - } -} - -// DialByNodeIDandMask opens a session to the given node based on raw NodeID -// parameters. If ctx is nil or has no timeout, then a default timeout of 6 -// seconds will apply, beginning *after* the search finishes. -func (d *Dialer) DialByNodeIDandMask(ctx context.Context, nodeID, nodeMask *crypto.NodeID) (net.Conn, error) { - startDial := time.Now() - conn := newConn(d.core, nodeID, nodeMask, nil) - if err := conn.search(); err != nil { - // TODO: make searches take a context, so they can be cancelled early - conn.Close() - return nil, err - } - endSearch := time.Now() - d.core.log.Debugln("Dial searched for:", nodeID, "in time:", endSearch.Sub(startDial)) - conn.session.setConn(nil, conn) - var cancel context.CancelFunc - if ctx == nil { - ctx = context.Background() - } - ctx, cancel = context.WithTimeout(ctx, 6*time.Second) - defer cancel() - select { - case <-conn.session.init: - endInit := time.Now() - d.core.log.Debugln("Dial initialized session for:", nodeID, "in time:", endInit.Sub(endSearch)) - d.core.log.Debugln("Finished dial for:", nodeID, "in time:", endInit.Sub(startDial)) - return conn, nil - case <-ctx.Done(): - conn.Close() - return nil, errors.New("session handshake timeout") - } -} - -// DialByPublicKey opens a session to the given node based on the public key. If -// ctx is nil or has no timeout, then a default timeout of 6 seconds will apply, -// beginning *after* the search finishes. -func (d *Dialer) DialByPublicKey(ctx context.Context, pubKey *crypto.BoxPubKey) (net.Conn, error) { - nodeID := crypto.GetNodeID(pubKey) - var nodeMask crypto.NodeID - for i := range nodeMask { - nodeMask[i] = 0xFF - } - return d.DialByNodeIDandMask(ctx, nodeID, &nodeMask) -} diff --git a/src/yggdrasil/listener.go b/src/yggdrasil/listener.go deleted file mode 100644 index 74ef3e88..00000000 --- a/src/yggdrasil/listener.go +++ /dev/null @@ -1,45 +0,0 @@ -package yggdrasil - -import ( - "errors" - "net" -) - -// Listener waits for incoming sessions -type Listener struct { - core *Core - conn chan *Conn - close chan interface{} -} - -// Accept blocks until a new incoming session is received -func (l *Listener) Accept() (net.Conn, error) { - select { - case c, ok := <-l.conn: - if !ok { - return nil, errors.New("listener closed") - } - return c, nil - case <-l.close: - return nil, errors.New("listener closed") - } -} - -// Close will stop the listener -func (l *Listener) Close() (err error) { - defer func() { - recover() - err = errors.New("already closed") - }() - if l.core.router.sessions.listener == l { - l.core.router.sessions.listener = nil - } - close(l.close) - close(l.conn) - return nil -} - -// Addr returns the address of the listener -func (l *Listener) Addr() net.Addr { - return &l.core.boxPub -} diff --git a/src/yggdrasil/packetconn.go b/src/yggdrasil/packetconn.go new file mode 100644 index 00000000..be9c2043 --- /dev/null +++ b/src/yggdrasil/packetconn.go @@ -0,0 +1,175 @@ +package yggdrasil + +import ( + "errors" + "net" + "time" + + "github.com/Arceliar/phony" + "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/types" +) + +type packet struct { + addr *crypto.BoxPubKey + payload []byte +} + +type PacketConn struct { + phony.Inbox + net.PacketConn + closed bool + readBuffer chan packet + sessions *sessions +} + +func newPacketConn(ss *sessions) *PacketConn { + return &PacketConn{ + sessions: ss, + readBuffer: make(chan packet), + } +} + +// implements net.PacketConn +func (c *PacketConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { + if c.closed { + return 0, nil, PacketConnError{closed: true} + } + packet := <-c.readBuffer + copy(b, packet.payload) + return len(packet.payload), packet.addr, nil +} + +// implements net.PacketConn +func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { + if c.closed { + return 0, PacketConnError{closed: true} + } + + boxPubKey, ok := addr.(*crypto.BoxPubKey) + if !ok { + return 0, errors.New("expected crypto.BoxPubKey as net.Addr") + } + + session, ok := c.sessions.getByTheirPerm(boxPubKey) + if !ok { + return 0, errors.New("expected a session but there was none") + } + + err := make(chan error, 1) + msg := FlowKeyMessage{Message: b} + nodeID := crypto.GetNodeID(boxPubKey) + nodeMask := &crypto.NodeID{} + for i := range nodeMask { + nodeMask[i] = 0xFF + } + + session.Act(c, func() { + // Check if the packet is small enough to go through this session + sessionMTU := session._getMTU() + if types.MTU(len(b)) > sessionMTU { + err <- PacketConnError{maxsize: int(sessionMTU)} + return + } + + // Send the packet + session._send(msg) + + // Session keep-alive, while we wait for the crypto workers from send + switch { + case time.Since(session.time) > 6*time.Second: + if session.time.Before(session.pingTime) && time.Since(session.pingTime) > 6*time.Second { + // TODO double check that the above condition is correct + c.sessions.router.Act(session, func() { + // Check to see if there is a search already matching the destination + sinfo, isIn := c.sessions.router.searches.searches[*nodeID] + if !isIn { + // Nothing was found, so create a new search + searchCompleted := func(sinfo *sessionInfo, e error) {} + sinfo = c.sessions.router.searches.newIterSearch(nodeID, nodeMask, searchCompleted) + c.sessions.router.core.log.Debugf("DHT search started: %p", sinfo) + // Start the search + sinfo.startSearch() + } + }) + } else { + session.ping(session) // TODO send from self if this becomes an actor + } + case session.reset && session.pingTime.Before(session.time): + session.ping(session) // TODO send from self if this becomes an actor + default: // Don't do anything, to keep traffic throttled + } + + err <- nil + }) + + e := <-err + return len(b), e +} + +// implements net.PacketConn +func (c *PacketConn) Close() error { + return nil +} + +// implements net.PacketConn +func (c *PacketConn) LocalAddr() net.Addr { + return &c.sessions.router.core.boxPub +} + +// implements net.PacketConn +func (c *PacketConn) SetDeadline(t time.Time) error { + return nil +} + +// implements net.PacketConn +func (c *PacketConn) SetReadDeadline(t time.Time) error { + return nil +} + +// implements net.PacketConn +func (c *PacketConn) SetWriteDeadline(t time.Time) error { + return nil +} + +// PacketConnError implements the net.Error interface +type PacketConnError struct { + error + timeout bool + temporary bool + closed bool + maxsize int +} + +// Timeout returns true if the error relates to a timeout condition on the +// connection. +func (e *PacketConnError) Timeout() bool { + return e.timeout +} + +// Temporary return true if the error is temporary or false if it is a permanent +// error condition. +func (e *PacketConnError) Temporary() bool { + return e.temporary +} + +// PacketTooBig returns in response to sending a packet that is too large, and +// if so, the maximum supported packet size that should be used for the +// connection. +func (e *PacketConnError) PacketTooBig() bool { + return e.maxsize > 0 +} + +// PacketMaximumSize returns the maximum supported packet size. This will only +// return a non-zero value if ConnError.PacketTooBig() returns true. +func (e *PacketConnError) PacketMaximumSize() int { + if !e.PacketTooBig() { + return 0 + } + return e.maxsize +} + +// Closed returns if the session is already closed and is now unusable. +func (e *PacketConnError) Closed() bool { + return e.closed +} diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 360f2a1b..16828052 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -11,6 +11,7 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/types" "github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/Arceliar/phony" @@ -33,8 +34,8 @@ type sessionInfo struct { myHandle crypto.Handle // theirNonce crypto.BoxNonce // myNonce crypto.BoxNonce // - theirMTU MTU // - myMTU MTU // + theirMTU types.MTU // + myMTU types.MTU // wasMTUFixed bool // Was the MTU fixed by a receive error? timeOpened time.Time // Time the session was opened time time.Time // Time we last received a packet @@ -47,7 +48,7 @@ type sessionInfo struct { bytesRecvd uint64 // Bytes of real traffic received in this session init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use cancel util.Cancellation // Used to terminate workers - conn *Conn // The associated Conn object + conn *PacketConn // The associated PacketConn object callbacks []chan func() // Finished work from crypto workers table *lookupTable // table.self is a locator where we get our coords } @@ -60,7 +61,7 @@ type sessionPing struct { Coords []byte // Tstamp int64 // unix time, but the only real requirement is that it increases IsPong bool // - MTU MTU // + MTU types.MTU // } // Updates session info in response to a ping, after checking that the ping is OK. @@ -83,9 +84,6 @@ func (s *sessionInfo) _update(p *sessionPing) bool { } if p.MTU >= 1280 || p.MTU == 0 { s.theirMTU = p.MTU - if s.conn != nil { - s.conn.setMTU(s, s._getMTU()) - } } if !bytes.Equal(s.coords, p.Coords) { // allocate enough space for additional coords @@ -109,12 +107,11 @@ func (s *sessionInfo) _update(p *sessionPing) bool { // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { router *router - listener *Listener - listenerMutex sync.Mutex + packetConn *PacketConn lastCleanup time.Time isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed isAllowedMutex sync.RWMutex // Protects the above - myMaximumMTU MTU // Maximum allowed session MTU + myMaximumMTU types.MTU // Maximum allowed session MTU permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot sinfos map[crypto.Handle]*sessionInfo // Maps handle onto session info byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle @@ -123,6 +120,7 @@ type sessions struct { // Initializes the session struct. func (ss *sessions) init(r *router) { ss.router = r + ss.packetConn = newPacketConn(ss) ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.sinfos = make(map[crypto.Handle]*sessionInfo) ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle) @@ -330,13 +328,6 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { } } -func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { - sinfo.Act(from, func() { - sinfo.conn = conn - sinfo.conn.setMTU(sinfo, sinfo._getMTU()) - }) -} - // Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. // If the session has a packet cached (common when first setting up a session), it will be sent. func (ss *sessions) handlePing(ping *sessionPing) { @@ -347,23 +338,12 @@ func (ss *sessions) handlePing(ping *sessionPing) { case isIn: // Session already exists case !ss.isSessionAllowed(&ping.SendPermPub, false): // Session is not allowed default: - ss.listenerMutex.Lock() - if ss.listener != nil { - // This is a ping from an allowed node for which no session exists, and we have a listener ready to handle sessions. - // We need to create a session and pass it to the listener. - sinfo = ss.createSession(&ping.SendPermPub) - if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo { - panic("This should not happen") - } - conn := newConn(ss.router.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo) - for i := range conn.nodeMask { - conn.nodeMask[i] = 0xFF - } - sinfo.setConn(ss.router, conn) - c := ss.listener.conn - go func() { c <- conn }() + // This is a ping from an allowed node for which no session exists, and we have a listener ready to handle sessions. + // We need to create a session and pass it to the listener. + sinfo = ss.createSession(&ping.SendPermPub) + if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo { + panic("This should not happen") } - ss.listenerMutex.Unlock() } if sinfo != nil { sinfo.Act(ss.router, func() { @@ -381,7 +361,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { // Get the MTU of the session. // Will be equal to the smaller of this node's MTU or the remote node's MTU. // If sending over links with a maximum message size (this was a thing with the old UDP code), it could be further lowered, to a minimum of 1280. -func (sinfo *sessionInfo) _getMTU() MTU { +func (sinfo *sessionInfo) _getMTU() types.MTU { if sinfo.theirMTU == 0 || sinfo.myMTU == 0 { return 0 } @@ -467,7 +447,10 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { } sinfo._updateNonce(&p.Nonce) sinfo.bytesRecvd += uint64(len(bs)) - sinfo.conn.recvMsg(sinfo, bs) + sinfo.sessions.packetConn.readBuffer <- packet{ + addr: &sinfo.theirPermPub, + payload: bs, + } } ch <- callback sinfo.checkCallbacks() diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 9746d46e..d41281e4 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -9,6 +9,7 @@ package yggdrasil import ( "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/types" ) const ( @@ -382,7 +383,7 @@ func (p *sessionPing) decode(bs []byte) bool { if pType == wire_SessionPong { p.IsPong = true } - p.MTU = MTU(mtu) + p.MTU = types.MTU(mtu) return true }